OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Table with field based partitioning must have a schema


Shall I file a GH issue for that? Although I guess that’s something on the BQ side rather than the Beam, right?

Thanks!!
On Sat, 24 Mar 2018 at 22:50, Ted Yu <yuzhihong@xxxxxxxxx> wrote:
It seems an improvement can be made where if CREATE_NEVER is present, table with field based partitioning doesn't have to be associated with a schema.

Cheers

On Sat, Mar 24, 2018 at 2:30 PM, Carlos Alonso <carlos@xxxxxxxxxxxxx> wrote:
Otherwise the BQ load job fails with the above error as well (Table with field based partitioning must have a schema).
On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
Hmm, glad it worked, but - if your create disposition was CREATE_NEVER, then why implement getSchema at all?


On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <carlos@xxxxxxxxxxxxx> wrote:
The thing is that the previous log "Returning schema for ..." never appears, so I don't think anything will appear on the log if I log what you suggest too.

Actually, after a couple more attempts, I changed the writeDisposition of the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically worked... So I guess there's something wrong when CREATE_NEVER is set or something I don't understand...

FYI my BigQueryIO looks like this
BigQueryIO.write()
.to(new JsonRouter(dataset))
.withFormatFunction(i => JsonRouter.jsonToRow(i._1))
.withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withMethod(Write.Method.FILE_LOADS)

Thanks!

On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
Can you try logging the result of your BigQueryUtil.parseSchema and confirm that it is always non-empty? What does the result look like for the table that's failing to load?

On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <carlos@xxxxxxxxxxxxx> wrote:
Hi everyone!!

When trying to insert into BigQuery using dynamic destinations I get this error: "Tabie with field based partitioning must have a schema" that suggests that I'm not providing such a schema and I don't understand why as I think I am. Here: https://pastebin.com/Q1jF024B you can find the full stack trace and below you can see the code of the DynamicDestinations implementation. Basically I'm dumping a stream of PubSub into BQ being that stream of heterogeneous Json documents and routing each type to its corresponding table.

The tuples contain the Json document and the schema itself for the corresponding table (the tuple is composed in a previous transform before from a side input as the schema is read from BQ using BigQueryClient class). and the Destination KV[String, String] is supposed to hold the table name as key and the schema as value.

The logs show many entries for "Returning destination for...", a few of "Returning table..." ones and no "Returning schema for..." at all which may indicate why BQ complains that no schema is provided, the question would then be... Why is that method never invoked?

class JsonRouter(dataset: String)
extends DynamicDestinations[(Json, String), KV[String, String]] {

import JsonRouter._

override def getDestination(element: ValueInSingleWindow[(Json, String)]): KV[String, String] = {
log.debug(s"Returning destination for ${element.getValue}")
KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
}

override def getSchema(element: KV[String, String]): TableSchema = {
log.debug(s"Returning schema for ${element.getKey}")
BigQueryUtil.parseSchema(element.getValue)
}

override def getTable(element: KV[String, String]): TableDestination = {
log.debug(s"Returning table for ${element.getKey}")
new TableDestination(s"$dataset.${element.getKey}", s"Table to store ${element.getKey}",
BQTypesRouter.TimePartitioning)
}

override def getDestinationCoder: Coder[KV[String, String]] =
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
}

Thanks!!