osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Table with field based partitioning must have a schema


Can you try logging the result of your BigQueryUtil.parseSchema and confirm that it is always non-empty? What does the result look like for the table that's failing to load?

On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <carlos@xxxxxxxxxxxxx> wrote:
Hi everyone!!

When trying to insert into BigQuery using dynamic destinations I get this error: "Tabie with field based partitioning must have a schema" that suggests that I'm not providing such a schema and I don't understand why as I think I am. Here: https://pastebin.com/Q1jF024B you can find the full stack trace and below you can see the code of the DynamicDestinations implementation. Basically I'm dumping a stream of PubSub into BQ being that stream of heterogeneous Json documents and routing each type to its corresponding table.

The tuples contain the Json document and the schema itself for the corresponding table (the tuple is composed in a previous transform before from a side input as the schema is read from BQ using BigQueryClient class). and the Destination KV[String, String] is supposed to hold the table name as key and the schema as value.

The logs show many entries for "Returning destination for...", a few of "Returning table..." ones and no "Returning schema for..." at all which may indicate why BQ complains that no schema is provided, the question would then be... Why is that method never invoked?

class JsonRouter(dataset: String)
extends DynamicDestinations[(Json, String), KV[String, String]] {

import JsonRouter._

override def getDestination(element: ValueInSingleWindow[(Json, String)]): KV[String, String] = {
log.debug(s"Returning destination for ${element.getValue}")
KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
}

override def getSchema(element: KV[String, String]): TableSchema = {
log.debug(s"Returning schema for ${element.getKey}")
BigQueryUtil.parseSchema(element.getValue)
}

override def getTable(element: KV[String, String]): TableDestination = {
log.debug(s"Returning table for ${element.getKey}")
new TableDestination(s"$dataset.${element.getKey}", s"Table to store ${element.getKey}",
BQTypesRouter.TimePartitioning)
}

override def getDestinationCoder: Coder[KV[String, String]] =
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
}

Thanks!!