OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

FileIO - How to get directory per hour


Hi all,

I am working through a use case where I would like to write events from PubSubIO to GCS. The events are protobuf events so I used a custom FileIO.Sink which is defined as:

    // delimited writer
    static class ProtobufFileIOSink<T extends Message> implements FileIO.Sink<T> {
        @Nullable private transient CodedOutputStream cos;

        @Override
        public void open(WritableByteChannel channel) {
            this.cos = CodedOutputStream.newInstance(Channels.newOutputStream(channel));
        }

        @Override
        public void write(Message element) throws IOException {
            if (element == null) {
                return;
            }

            if (cos == null) {
                return;
            }

            element.writeTo(cos);
        }

        @Override
        public void flush() throws IOException {
            if (cos != null) {
                cos.flush();
            }
        }
    }

The pipeline that I am using is this:

p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input + "-billing.proto").withIdAttribute("event_id"))
 .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))))
 .apply(FileIO.<Billing.BillingMeasurement>write().via(new ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output + "/billing.proto/").withSuffix(".pb"));

In order to make sifting through the output easier, I would like to have the resulting file to be organized by year/month/day/hour so the hours looks like:

gs://<bucket-name>/<prefixdir>/<year>/<month>/<day>/<hour>/[filename.pb.gz]

I tired looking through FileIO.writeDynamic() and FileNaming but I am not sure if that is the correct place. Is there an example or another implementation that someone can point me to that would be a good place to look at.

— Ankur Chauhan