MultipleFileOutputFormat.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.io.impl.csv.indexed.functions;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* The abstract base class for all output formats using multiple files.
* This format will (for each record to write) determine a subdirectory, create a
* new OutputFormat for that directory and write the record using the newly created format.
* Those formats will be created as soon as the first record is to be written to that directory
* and they will not be initialized automatically.
* <br>
* <i>Warning:</i> The {@link #configure(Configuration)}, {@link #open(int, int)},
* {@link #close()} and {@link #initializeGlobal(int)} methods will not invoke the respective
* methods of the new output formats, their parameters will just be stored as
* {@link #configuration}, {@link #taskNumber}, {@link #numTasks}, {@link #parallelism}
* respectively. Make sure to configure each output format in
* {@link #createFormatForDirectory(Path)}.
*
* @param <IT> The type of the records to write.
*/
public abstract class MultipleFileOutputFormat<IT>
implements OutputFormat<IT>, CleanupWhenUnsuccessful, InitializeOnMaster {
/**
* The configuration set with {@link #configure(Configuration)}.
*/
protected Configuration configuration;
/**
* The write mode of this format.
*/
protected FileSystem.WriteMode writeMode;
/**
* The number of this write task.
*/
protected int taskNumber;
/**
* The number of tasks used by the sink.
*/
protected int numTasks;
/**
* The parallelism of this format.
*/
protected int parallelism;
/**
* The root output directory.
*/
protected Path rootOutputPath;
/**
* Stores {@link OutputFormat}s used internally for each subdirectory determined by
* {@link #getDirectoryForRecord(Object)}.
*/
private Map<String, OutputFormat<IT>> formatsPerSubdirectory;
/**
* Creates a new output format with multiple output files.
*
* @param rootPath The root directory where all files will be stored.
*/
MultipleFileOutputFormat(Path rootPath) {
this.rootOutputPath = rootPath;
formatsPerSubdirectory = new HashMap<>();
}
@Override
public void close() throws IOException {
for (OutputFormat<IT> outputFormat : formatsPerSubdirectory.values()) {
outputFormat.close();
}
formatsPerSubdirectory.clear();
}
@Override
public void configure(Configuration parameters) {
this.configuration = parameters;
}
@Override
public void initializeGlobal(int parallelism) throws IOException {
this.parallelism = parallelism;
// Prepare root output directory
final FileSystem fs = rootOutputPath.getFileSystem();
if (fs.isDistributedFS()) {
if (!fs.initOutPathDistFS(rootOutputPath, writeMode, true)) {
throw new IOException("Failed to initialize output root directory: " + rootOutputPath);
}
} else {
if (writeMode == FileSystem.WriteMode.OVERWRITE) {
try {
fs.delete(rootOutputPath, true);
} catch (IOException e) {
throw new IOException("Could not remove existing output root directory: " +
rootOutputPath, e);
}
}
if (!fs.initOutPathLocalFS(rootOutputPath, writeMode, true)) {
throw new IOException("Failed to initialize output root directory: " + rootOutputPath);
}
}
}
@Override
public void open(int taskNumber, int numTasks) {
this.taskNumber = taskNumber;
this.numTasks = numTasks;
}
/**
* Set the write mode of this format.
*
* @param writeMode The new write mode.
*/
public void setWriteMode(FileSystem.WriteMode writeMode) {
this.writeMode = writeMode;
}
@Override
public void tryCleanupOnError() throws Exception {
for (OutputFormat<IT> outputFormat : formatsPerSubdirectory.values()) {
if (outputFormat instanceof CleanupWhenUnsuccessful) {
((CleanupWhenUnsuccessful) outputFormat).tryCleanupOnError();
}
}
rootOutputPath.getFileSystem().delete(rootOutputPath, false);
}
@Override
public void writeRecord(IT record) throws IOException {
String subDirectory = getDirectoryForRecord(record);
OutputFormat<IT> format;
if (formatsPerSubdirectory.containsKey(subDirectory)) {
format = formatsPerSubdirectory.get(subDirectory);
} else {
format = createFormatForDirectory(new Path(rootOutputPath, subDirectory));
format.open(taskNumber, numTasks);
formatsPerSubdirectory.put(subDirectory, format);
}
format.writeRecord(record);
}
/**
* Create or load an {@link OutputFormat} to use for a specific directory.
* The directory should be a subdirectory of the root directory of this output format.
*
* @param directory The (sub-)directory where the output format operates.
* @return The output format used to write in that directory.
* @throws IOException when the initialization of the new format fails.
*/
protected abstract OutputFormat<IT> createFormatForDirectory(Path directory) throws IOException;
/**
* Get the appropriate subdirectory for a record.
* This is expected to return a relative path from the root directory.
*
* @param record The record to write.
* @return The output directory to store the record in.
*/
protected abstract String getDirectoryForRecord(IT record);
/**
* Replace illegal filename characters ({@code <, >, :, ", /, \, |, ?, *}) with {@code '_'}
* and change the string to lower case.
*
* @param filename filename to be cleaned
* @return cleaned filename
*/
public static String cleanFilename(String filename) {
return filename.replaceAll("[<>:\"/\\\\|?*]", "_").toLowerCase();
}
}