osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Error while reading from hadoop sequence file


Hi,
   I have been facing issues while trying to read from a hdfs sequence file.

This is my code snippet
DataSource<Tuple2<Text, Text>> input = env
.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, ravenDataDir),
TypeInformation.of(new TypeHint<Tuple2<Text, Text>>() {
}));

Upon executing this in yarn cluster mode, I am getting following error
The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.
org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551)
flipkart.EnrichementFlink.main(EnrichementFlink.java:31)


When I add the TypeInformation myself as follows, I run into the same issue.
DataSource<Tuple2<Text, Text>> input = env
.createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, ravenDataDir));



When I add these libraries in the lib folder, 
flink-hadoop-compatibility_2.11-1.7.0.jar


the error changes to this

java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeutils/TypeSerializerSnapshot
at org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111)
at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283)
at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252)
at org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97)
at org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)


Can someone help me resolve this issue?

Thanks,
Akshay