OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified


Hi Mingey !

I’ve implemented the group of tests, that shows that problem exists only when part suffix is specified and file in pending state exists

here is an exception

testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest)  Time elapsed: 0.018 sec  <<< ERROR!
java.io.IOException: File already exists: /var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzm0000gn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71)
at org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909)


You could add the following test to the org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class

    @Test//(expected = IOException.class)
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
throws Exception {
testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
}

private void testThatPartIndexIsIncremented(String partSuffix, String existingPartFile) throws Exception {
File outDir = tempFolder.newFolder();
long inactivityInterval = 100;

java.nio.file.Path bucket = Paths.get(outDir.getPath());
Files.createFile(bucket.resolve(existingPartFile));

String basePath = outDir.getAbsolutePath();
BucketingSink<String> sink = new BucketingSink<String>(basePath)
.setBucketer(new BasePathBucketer<>())
.setInactiveBucketCheckInterval(inactivityInterval)
.setInactiveBucketThreshold(inactivityInterval)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
.setPartSuffix(partSuffix)
.setBatchSize(0);

OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
testHarness.setup();
testHarness.open();

testHarness.setProcessingTime(0L);

testHarness.processElement(new StreamRecord<>("test1", 1L));

testHarness.setProcessingTime(101L);
testHarness.snapshot(0, 0);
testHarness.notifyOfCompletedCheckpoint(0);
sink.close();

String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + partSuffix;
// assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
}

And check, that test fails

it’s actual for the current master branch, also I’ve implemented a PR, that fixes this problem (https://github.com/apache/flink/pull/6176)

For some reasons, I still couldn’t compile the whole flink repository, to run your example locally from IDE, but from my point of view, problem exists, and the following test shows it’s existance, please, have a look

I’m working on flink project assembly on my local machine …

Thx


On 25 Jun 2018, at 10:44, Rinat <r.sharipov@xxxxxxxxxxxxx> wrote:

Hi Mingey !

Thx for your reply, really, have no idea why everything works in your case, I have implemented unit tests in my PR which shows, that problem exists. Please, let me know which Flink version do you use ?
Current fix is actual for current master branch, here it an example of unit test, that shows the problem

@Test
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws Exception {
String partSuffix = ".my";

File outDir = tempFolder.newFolder();
long inactivityInterval = 100;

java.nio.file.Path bucket = Paths.get(outDir.getPath());
Files.createFile(bucket.resolve("part-0-0.my.pending"));

String basePath = outDir.getAbsolutePath();
BucketingSink<String> sink = new BucketingSink<String>(basePath)
.setBucketer(new BasePathBucketer<>())
.setInactiveBucketCheckInterval(inactivityInterval)
.setInactiveBucketThreshold(inactivityInterval)
.setPartPrefix(PART_PREFIX)
.setInProgressPrefix("")
.setPendingPrefix("")
.setValidLengthPrefix("")
.setInProgressSuffix(IN_PROGRESS_SUFFIX)
.setPendingSuffix(PENDING_SUFFIX)
.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
.setPartSuffix(partSuffix)
.setBatchSize(0);

OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
testHarness.setup();
testHarness.open();

testHarness.setProcessingTime(0L);

testHarness.processElement(new StreamRecord<>("test1", 1L));

testHarness.setProcessingTime(101L);
testHarness.snapshot(0, 0);
testHarness.notifyOfCompletedCheckpoint(0);
sink.close();

assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
}

On 24 Jun 2018, at 06:02, zhangminglei <18717838093@xxxxxxx> wrote:

Hi, Rinat

I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending,  _part-0-685PartSuffix.pending and so on since checkpoint does not finished.

Cheers
Minglei.

public class TestSuffix {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
sEnv.enableCheckpointing(200);
sEnv.setParallelism(1);

BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
.setInactiveBucketThreshold(1000)
.setInactiveBucketCheckInterval(1000)
.setPartSuffix("PartSuffix")
.setBatchSize(500);

sEnv.addSource(new DataGenerator())
.keyBy(0)
.map(new CountUpRichMap())
.addSink(sink);

sEnv.execute();
}

public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, String, Integer>> {

private ValueState<Integer> counter;

@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT));
}

@Override
public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, String> value) throws Exception {
Integer counterValue = counter.value();
if (counterValue == null) {
counterValue = 0;
}
counter.update(counterValue + 1);
return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
}
}

public static class DataGenerator implements SourceFunction<Tuple3<Integer, String, String>> {

public DataGenerator() {
}

@Override
public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some payloads......"));
}
}

@Override
public void cancel() {

}
}
}




在 2018年6月16日,下午10:21,Rinat <r.sharipov@xxxxxxxxxxxxx> 写道:

Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.

During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
while (fs.exists(partPath) ||
fs.exists(getPendingPathFor(partPath)) ||
fs.exists(getInProgressPathFor(partPath))) {
bucketState.partCounter++;
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever