osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Trying to write to parquet file (kafka as a source) yields thousands of "in progress" files


Hi,
I am trying to read from kafka and write to parquet. But I am getting thousands of ".part-0-0in progress..." files (and counting ...) 
is that a bug or am I doing something wrong?

object StreamParquet extends App {
  implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(100)  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setParallelism(1)
val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new AddressSchema(), consumerProperties)
  val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
  val outputPath = "streaming_files"
  val sink = StreamingFileSink.forBulkFormat(
    new Path(outputPath),
    ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
  stream.addSink(sink)
  env.execute("Write to file")
}