osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-9751) Add a RecoverableWriter to the FileSystem abstraction


Stephan Ewen created FLINK-9751:
-----------------------------------

             Summary: Add a RecoverableWriter to the FileSystem abstraction
                 Key: FLINK-9751
                 URL: https://issues.apache.org/jira/browse/FLINK-9751
             Project: Flink
          Issue Type: Sub-task
    Affects Versions: 1.6.0
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen


The core operation of the StreamingFileSink is to append result data to (hidden) "in progress" files and then, when the files should roll over, publish them as visible files. At each checkpoint, the data so far must be persistent in the "in progress" files. On recovery, we resume the "in progress" file at the exact position of the checkpoint, or publish up to the position of that checkpoint.

In order to support various file systems and object stores, we need an interface that captures these core operations and allows for different implementations (such as file truncate/append on posix, MultiPartUpload on S3, ...)

Proposed interface:
{code:java}
/**
 * A handle to an in-progress stream with a defined and persistent amount of data.
 * The handle can be used to recover the stream and publish the result file.
 */
interface CommitRecoverable { ... }

/**
 * A handle to an in-progress stream with a defined and persistent amount of data.
 * The handle can be used to recover the stream and either publish the result file
 * or keep appending data to the stream.
 */
interface ResumeRecoverable extends CommitRecoverable { ... }

/**
 * An output stream to a file system that can be recovered at well defined points.
 * The stream initially writes to hidden files or temp files and only creates the
 * target file once it is closed and "committed".
 */
public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {

    /**
     * Ensures all data so far is persistent (similar to {@link #sync()}) and returns
     * a handle to recover the stream at the current position.
     */
    public abstract ResumeRecoverable persist() throws IOException;

    /**
     * Closes the stream, ensuring persistence of all data (similar to {@link #sync()}).
     * This returns a Committer that can be used to publish (make visible) the file
     * that the stream was writing to.
     */
    public abstract Committer closeForCommit() throws IOException;

    /**
     * A committer can publish the file of a stream that was closed.
     * The Committer can be recovered via a {@link CommitRecoverable}.
     */
    public interface Committer {

        void commit() throws IOException;

        CommitRecoverable getRecoverable();
    }
}

/**
 * The RecoverableWriter creates and recovers RecoverableFsDataOutputStream.
 */
public interface RecoverableWriter{

    RecoverableFsDataOutputStream open(Path path) throws IOException;

    RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;

    RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException;
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)