TxCollectionLayoutFactory.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.layouts.transactional;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.pojo.EPGMGraphElement;
import org.gradoop.common.util.GradoopConstants;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayout;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayoutFactory;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
import org.gradoop.flink.model.impl.functions.bool.False;
import org.gradoop.flink.model.impl.functions.epgm.GraphElementExpander;
import org.gradoop.flink.model.impl.functions.epgm.GraphVerticesEdges;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.TransactionFromSets;
import org.gradoop.flink.model.impl.functions.utils.Cast;
import org.gradoop.flink.model.impl.layouts.common.BaseFactory;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Responsible for producing instances of {@link TxCollectionLayout}.
*/
public class TxCollectionLayoutFactory extends BaseFactory
implements GraphCollectionLayoutFactory<EPGMGraphHead, EPGMVertex, EPGMEdge> {
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromDataSets(
DataSet<EPGMGraphHead> graphHeads, DataSet<EPGMVertex> vertices) {
Objects.requireNonNull(graphHeads);
Objects.requireNonNull(vertices);
return fromDataSets(graphHeads, vertices,
createEdgeDataSet(Lists.newArrayListWithCapacity(0)));
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromDataSets(
DataSet<EPGMGraphHead> inGraphHeads,
DataSet<EPGMVertex> inVertices,
DataSet<EPGMEdge> inEdges) {
Objects.requireNonNull(inGraphHeads);
Objects.requireNonNull(inVertices);
Objects.requireNonNull(inEdges);
// Add a dummy graph head for entities which have no assigned graph
DataSet<EPGMGraphHead> dbGraphHead = getConfig().getExecutionEnvironment().fromElements(
getGraphHeadFactory()
.initGraphHead(GradoopConstants.DB_GRAPH_ID, GradoopConstants.DB_GRAPH_LABEL)
);
inGraphHeads = inGraphHeads.union(dbGraphHead);
DataSet<Tuple2<GradoopId, EPGMGraphElement>> vertices = inVertices
.map(new Cast<>(EPGMGraphElement.class))
.returns(TypeExtractor.getForClass(EPGMGraphElement.class))
.flatMap(new GraphElementExpander<>());
DataSet<Tuple2<GradoopId, EPGMGraphElement>> edges = inEdges
.map(new Cast<>(EPGMGraphElement.class))
.returns(TypeExtractor.getForClass(EPGMGraphElement.class))
.flatMap(new GraphElementExpander<>());
DataSet<Tuple3<GradoopId, Set<EPGMVertex>, Set<EPGMEdge>>> transactions = vertices
.union(edges)
.groupBy(0)
.combineGroup(new GraphVerticesEdges())
.groupBy(0)
.reduceGroup(new GraphVerticesEdges());
DataSet<GraphTransaction> graphTransactions = inGraphHeads
.leftOuterJoin(transactions)
.where(new Id<>()).equalTo(0)
.with(new TransactionFromSets());
return new TxCollectionLayout(graphTransactions);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromIndexedDataSets(
Map<String, DataSet<EPGMGraphHead>> graphHeads,
Map<String, DataSet<EPGMVertex>> vertices,
Map<String, DataSet<EPGMEdge>> edges) {
Objects.requireNonNull(graphHeads);
Objects.requireNonNull(vertices);
Objects.requireNonNull(edges);
return fromDataSets(
graphHeads.values().stream().reduce(DataSet::union)
.orElseThrow(() -> new RuntimeException("Error during graph head union")),
vertices.values().stream().reduce(DataSet::union)
.orElseThrow(() -> new RuntimeException("Error during vertex union")),
edges.values().stream().reduce(DataSet::union)
.orElseThrow(() -> new RuntimeException("Error during edge union"))
);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromCollections(
Collection<EPGMGraphHead> graphHeads,
Collection<EPGMVertex> vertices,
Collection<EPGMEdge> edges) {
return fromDataSets(
createGraphHeadDataSet(graphHeads),
createVertexDataSet(vertices),
createEdgeDataSet(edges)
);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromGraphLayout(
LogicalGraphLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> logicalGraphLayout) {
return fromDataSets(logicalGraphLayout.getGraphHead(),
logicalGraphLayout.getVertices(),
logicalGraphLayout.getEdges());
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromTransactions(
DataSet<GraphTransaction> transactions) {
return new TxCollectionLayout(transactions);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromTransactions(
DataSet<GraphTransaction> transactions,
GroupReduceFunction<EPGMVertex, EPGMVertex> vertexMergeReducer,
GroupReduceFunction<EPGMEdge, EPGMEdge> edgeMergeReducer) {
return new TxCollectionLayout(transactions);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> createEmptyCollection() {
return fromTransactions(createGraphTransactionDataSet(Lists.newArrayListWithCapacity(0)));
}
/**
* Creates a dataset from a given (possibly empty) collection of graph transactions.
*
* @param transactions graph transactions
* @return a dataset containing the given transactions
*/
private DataSet<GraphTransaction> createGraphTransactionDataSet(
Collection<GraphTransaction> transactions) {
ExecutionEnvironment env = getConfig().getExecutionEnvironment();
DataSet<GraphTransaction> graphTransactionSet;
if (transactions.isEmpty()) {
graphTransactionSet = env.fromCollection(Lists.newArrayList(new GraphTransaction()),
new TypeHint<GraphTransaction>() { }.getTypeInfo())
.filter(new False<>());
} else {
graphTransactionSet = env.fromCollection(transactions);
}
return graphTransactionSet;
}
}