osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

delay one of the datastream when performing join operation on event-time and watermark


Hi,
I have two data sources one is  for order data and another one is for invoice data, these two data i am pushing into kafka topic in json form. I wanted to delay order data for 5 mins because invoice data comes only after order data is generated. So, for that i have written a flink program which will take these two data from kafka and apply watermarks and delay order data for 5 mins. After applying watermarks on these data, i wanted to join these data based on order_id which is present in both order and invoice data. After Joining i wanted to push it to kafka in different topic.

But, i am not able to join these data streams with 5 min delay and i am not able to figure it out.

I am attaching my flink program below and it's dependency.

Attachment: FlinkEvent.java
Description: Binary data

<project xmlns="http://maven.apache.org/POM/4.0.0";
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";>
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.flink.streaming</groupId>
	<artifactId>flinkJoin</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>flinkJoin</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.5</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>1.7.0</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>1.7.0</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>1.7.0</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20180813</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>1.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-runtime_2.11</artifactId>
			<version>1.7.0</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.3</version>
				<configuration>
					<descriptors>
						<descriptor>src/main/assembly/default.xml</descriptor>
					</descriptors>
				</configuration>
				<executions>
					<execution>
						<id>assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>