GVECollectionLayoutFactory.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.layouts.gve;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayout;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayoutFactory;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.TransactionEdges;
import org.gradoop.flink.model.impl.functions.epgm.TransactionGraphHead;
import org.gradoop.flink.model.impl.functions.epgm.TransactionVertices;
import org.gradoop.flink.model.impl.functions.utils.First;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
/**
* Responsible for creating a {@link GVELayout} from given data.
*/
public class GVECollectionLayoutFactory extends GVEBaseFactory
implements GraphCollectionLayoutFactory<EPGMGraphHead, EPGMVertex, EPGMEdge> {
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromDataSets(
DataSet<EPGMGraphHead> graphHeads, DataSet<EPGMVertex> vertices) {
return fromDataSets(graphHeads, vertices,
createEdgeDataSet(new ArrayList<>(0)));
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromDataSets(
DataSet<EPGMGraphHead> graphHeads,
DataSet<EPGMVertex> vertices,
DataSet<EPGMEdge> edges) {
return create(graphHeads, vertices, edges);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromIndexedDataSets(
Map<String, DataSet<EPGMGraphHead>> graphHeads,
Map<String, DataSet<EPGMVertex>> vertices,
Map<String, DataSet<EPGMEdge>> edges) {
return create(graphHeads, vertices, edges);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromCollections(
Collection<EPGMGraphHead> graphHeads,
Collection<EPGMVertex> vertices,
Collection<EPGMEdge> edges) {
return fromDataSets(
createGraphHeadDataSet(Objects.requireNonNull(graphHeads, "EPGMGraphHead collection was null")),
createVertexDataSet(Objects.requireNonNull(vertices, "EPGMVertex collection was null")),
createEdgeDataSet(Objects.requireNonNull(edges, "EPGMEdge collection was null")));
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromGraphLayout(
LogicalGraphLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> graph) {
return fromDataSets(graph.getGraphHead(), graph.getVertices(), graph.getEdges());
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromTransactions(
DataSet<GraphTransaction> transactions) {
GroupReduceFunction<EPGMVertex, EPGMVertex> vertexReducer = new First<>();
GroupReduceFunction<EPGMEdge, EPGMEdge> edgeReducer = new First<>();
return fromTransactions(transactions, vertexReducer, edgeReducer);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> fromTransactions(
DataSet<GraphTransaction> transactions,
GroupReduceFunction<EPGMVertex, EPGMVertex> vertexMergeReducer,
GroupReduceFunction<EPGMEdge, EPGMEdge> edgeMergeReducer) {
DataSet<EPGMGraphHead> graphHeads = transactions.map(new TransactionGraphHead<>());
DataSet<EPGMVertex> vertices = transactions
.flatMap(new TransactionVertices<>())
.groupBy(new Id<>())
.reduceGroup(vertexMergeReducer);
DataSet<EPGMEdge> edges = transactions
.flatMap(new TransactionEdges<>())
.groupBy(new Id<>())
.reduceGroup(edgeMergeReducer);
return fromDataSets(graphHeads, vertices, edges);
}
@Override
public GraphCollectionLayout<EPGMGraphHead, EPGMVertex, EPGMEdge> createEmptyCollection() {
Collection<EPGMGraphHead> graphHeads = new ArrayList<>();
Collection<EPGMVertex> vertices = new ArrayList<>();
Collection<EPGMEdge> edges = new ArrayList<>();
return fromCollections(graphHeads, vertices, edges);
}
}