MetaDataSource.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.io.api.metadata;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.hadoop.conf.Configuration;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.metadata.MetaData;
import org.gradoop.common.model.impl.metadata.PropertyMetaData;
import org.gradoop.flink.io.api.metadata.functions.ElementToPropertyMetaData;
import org.gradoop.flink.io.api.metadata.functions.ReducePropertyMetaData;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.util.GradoopFlinkConfig;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* Base interface for factories that create metadata objects from tuples and files (distributed
* or locally).
*
* @param <M> meta data type
*/
public interface MetaDataSource<M extends MetaData> {
/**
* Used to tag a graph head entity.
*/
String GRAPH_TYPE = "g";
/**
* Used to tag a vertex entity.
*/
String VERTEX_TYPE = "v";
/**
* Used to tag an edge entity.
*/
String EDGE_TYPE = "e";
/**
* Creates the meta data for the given graph.
*
* @param graph logical graph
* @return meta data information
*/
default DataSet<Tuple3<String, String, String>> tuplesFromGraph(BaseGraph<?, ?, ?, ?, ?> graph) {
return tuplesFromElements(graph.getVertices())
.union(tuplesFromElements(graph.getEdges()));
}
/**
* Creates the meta data for the given graph collection.
*
* @param graphs graph collection
* @return meta data information
*/
default DataSet<Tuple3<String, String, String>> tuplesFromCollection(
BaseGraphCollection<?, ?, ?, ?, ?> graphs) {
return tuplesFromElements(graphs.getVertices())
.union(tuplesFromElements(graphs.getEdges()))
.union(tuplesFromElements(graphs.getGraphHeads()));
}
/**
* Creates the meta data for the specified data set of elements.
*
* @param elements elements
* @param <E> element type
* @return meta data information
*/
static <E extends Element> DataSet<Tuple3<String, String, String>> tuplesFromElements(
DataSet<E> elements) {
return elements
.map(new ElementToPropertyMetaData<>())
.groupBy(0, 1)
.reduce(new ReducePropertyMetaData())
.map(tuple -> Tuple3.of(
tuple.f0,
tuple.f1,
tuple.f2.stream().sorted().collect(
Collectors.joining(PropertyMetaData.PROPERTY_DELIMITER))))
.returns(new TupleTypeInfo<>(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO))
.withForwardedFields("f0", "f1");
}
/**
* Creates a {@link MetaData} object from the specified lines. The specified tuple is already
* separated into the label and the metadata.
*
* @param metaDataStrings (element prefix (g,v,e), label, meta data) tuples
* @return Meta Data object
*/
M fromTuples(List<Tuple3<String, String, String>> metaDataStrings);
/**
* Reads the meta data from a specified file. Each line is split into a
* (element prefix, label, metadata) tuple
* and put into a dataset. The latter can be used to broadcast the metadata to the mappers.
*
* @param path path to metadata csv file
* @param config gradoop configuration
* @return (element prefix ( g, v, e), label, metadata) tuple dataset
*/
DataSet<Tuple3<String, String, String>> readDistributed(String path, GradoopFlinkConfig config);
/**
* Reads the meta data from a specified csv file. The file can be either located in a local file
* system or in HDFS.
*
* @param path path to metadata csv file
* @param hdfsConfig file system configuration
* @return meta data
* @throws IOException on failure
*/
M readLocal(String path, Configuration hdfsConfig) throws IOException;
}