[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: FileIO - How to get directory per hour

Hi Ankur,
FileIO.write() with FileNaming is the right combination. It should be something like:
.apply(FileIO.write().via(...).to("gs://bucket-name/prefixdir").withNumShards(1).withNaming((window, pane, numShardsIgnored, shardIndexIgnored, compressionIgnored) -> ...construct a string like "$year/$month/$day/$hour/filename-$pane.pb"...))

Please note that you may have data for a given date/time arrive multiple times. FileIO can not append to existing files (and files on GCS can not be appended to anyway). That means that your filename MUST include the pane info (hence the "-pane" in my example above).

On Wed, Apr 25, 2018 at 5:02 PM Ankur Chauhan <ankur@xxxxxxxxxxxx> wrote:

Hi all,

I am working through a use case where I would like to write events from PubSubIO to GCS. The events are protobuf events so I used a custom FileIO.Sink which is defined as:

    // delimited writer
    static class ProtobufFileIOSink<T extends Message> implements FileIO.Sink<T> {
        @Nullable private transient CodedOutputStream cos;

        public void open(WritableByteChannel channel) {
            this.cos = CodedOutputStream.newInstance(Channels.newOutputStream(channel));

        public void write(Message element) throws IOException {
            if (element == null) {

            if (cos == null) {


        public void flush() throws IOException {
            if (cos != null) {

The pipeline that I am using is this:

p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input + "-billing.proto").withIdAttribute("event_id"))
 .apply(FileIO.<Billing.BillingMeasurement>write().via(new ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output + "/billing.proto/").withSuffix(".pb"));

In order to make sifting through the output easier, I would like to have the resulting file to be organized by year/month/day/hour so the hours looks like:


I tired looking through FileIO.writeDynamic() and FileNaming but I am not sure if that is the correct place. Is there an example or another implementation that someone can point me to that would be a good place to look at.

— Ankur Chauhan