TemporalGraphCollectionFactory.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Preconditions;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayout;
import org.gradoop.flink.model.api.layouts.GraphCollectionLayoutFactory;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
import org.gradoop.temporal.model.api.functions.TimeIntervalExtractor;
import org.gradoop.temporal.model.impl.functions.tpgm.EdgeToTemporalEdge;
import org.gradoop.temporal.model.impl.functions.tpgm.GraphHeadToTemporalGraphHead;
import org.gradoop.temporal.model.impl.functions.tpgm.VertexToTemporalVertex;
import org.gradoop.temporal.model.impl.layout.TemporalGraphCollectionLayoutFactory;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.util.TemporalGradoopConfig;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
/**
* Responsible for creating instances of {@link TemporalGraphCollection} based on a specific layout.
*/
public class TemporalGraphCollectionFactory implements BaseGraphCollectionFactory<
TemporalGraphHead, TemporalVertex, TemporalEdge, TemporalGraph, TemporalGraphCollection> {
/**
* The factory to create a temporal layout.
*/
private GraphCollectionLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> layoutFactory;
/**
* Temporal Gradoop config.
*/
private final TemporalGradoopConfig config;
/**
* Creates a new temporal graph collection factory instance.
*
* @param config the temporal Gradoop config.
*/
public TemporalGraphCollectionFactory(TemporalGradoopConfig config) {
this.config = Preconditions.checkNotNull(config);
this.layoutFactory = new TemporalGraphCollectionLayoutFactory();
this.layoutFactory.setGradoopFlinkConfig(config);
}
@Override
public GraphCollectionLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> getLayoutFactory() {
return layoutFactory;
}
@Override
public void setLayoutFactory(GraphCollectionLayoutFactory<TemporalGraphHead, TemporalVertex,
TemporalEdge> factory) {
this.layoutFactory = factory;
}
/**
* Creates a temporal graph collection instance from an existing layout.
*
* @param layout the layout that is used to store the graph data of this temporal graph collection instance
* @return a temporal graph collection
*/
public TemporalGraphCollection fromLayout(
GraphCollectionLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> layout) {
return new TemporalGraphCollection(layout, config);
}
@Override
public TemporalGraphCollection fromDataSets(DataSet<TemporalGraphHead> graphHeads,
DataSet<TemporalVertex> vertices) {
return new TemporalGraphCollection(layoutFactory.fromDataSets(graphHeads, vertices), config);
}
@Override
public TemporalGraphCollection fromDataSets(DataSet<TemporalGraphHead> graphHeads,
DataSet<TemporalVertex> vertices, DataSet<TemporalEdge> edges) {
return new TemporalGraphCollection(layoutFactory.fromDataSets(graphHeads, vertices, edges), config);
}
@Override
public TemporalGraphCollection fromIndexedDataSets(
Map<String, DataSet<TemporalGraphHead>> graphHeads,
Map<String, DataSet<TemporalVertex>> vertices,
Map<String, DataSet<TemporalEdge>> edges) {
return new TemporalGraphCollection(layoutFactory
.fromIndexedDataSets(graphHeads, vertices, edges), config);
}
@Override
public TemporalGraphCollection fromCollections(Collection<TemporalGraphHead> graphHeads,
Collection<TemporalVertex> vertices, Collection<TemporalEdge> edges) {
return new TemporalGraphCollection(layoutFactory.fromCollections(graphHeads, vertices, edges), config);
}
@Override
public TemporalGraphCollection fromGraph(
LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> logicalGraphLayout) {
return new TemporalGraphCollection(layoutFactory.fromGraphLayout(logicalGraphLayout), config);
}
@Override
public TemporalGraphCollection fromGraphs(
LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge>... logicalGraphLayout) {
if (logicalGraphLayout.length == 0) {
return createEmptyCollection();
} else if (logicalGraphLayout.length == 1) {
return fromGraph(logicalGraphLayout[0]);
} else {
return fromDataSets(
Arrays.stream(logicalGraphLayout).map(LogicalGraphLayout::getGraphHead).reduce(DataSet::union)
.get().distinct(new Id<>()),
Arrays.stream(logicalGraphLayout).map(LogicalGraphLayout::getVertices).reduce(DataSet::union)
.get().distinct(new Id<>()),
Arrays.stream(logicalGraphLayout).map(LogicalGraphLayout::getEdges).reduce(DataSet::union)
.get().distinct(new Id<>()));
}
}
@Override
public TemporalGraphCollection fromTransactions(DataSet<GraphTransaction> transactions) {
throw new UnsupportedOperationException("This operation is not (yet) supported.");
}
@Override
public TemporalGraphCollection fromTransactions(DataSet<GraphTransaction> transactions,
GroupReduceFunction<TemporalVertex, TemporalVertex> vertexMergeReducer,
GroupReduceFunction<TemporalEdge, TemporalEdge> edgeMergeReducer) {
throw new UnsupportedOperationException("This operation is not (yet) supported.");
}
@Override
public TemporalGraphCollection createEmptyCollection() {
return new TemporalGraphCollection(layoutFactory.createEmptyCollection(), config);
}
@Override
public GraphHeadFactory<TemporalGraphHead> getGraphHeadFactory() {
return layoutFactory.getGraphHeadFactory();
}
@Override
public VertexFactory<TemporalVertex> getVertexFactory() {
return layoutFactory.getVertexFactory();
}
@Override
public EdgeFactory<TemporalEdge> getEdgeFactory() {
return layoutFactory.getEdgeFactory();
}
/**
* Creates a {@link TemporalGraphCollection} instance by the given graph head, vertex and edge datasets.
*
* @param graphHead graph head DataSet
* @param vertices vertex DataSet
* @param edges edge DataSet
* @return a temporal graph collection
*/
public TemporalGraphCollection fromNonTemporalDataSets(
DataSet<? extends GraphHead> graphHead,
DataSet<? extends Vertex> vertices,
DataSet<? extends Edge> edges) {
return new TemporalGraphCollection(this.layoutFactory.fromDataSets(
graphHead.map(new GraphHeadToTemporalGraphHead<>(getGraphHeadFactory())),
vertices.map(new VertexToTemporalVertex<>(getVertexFactory())),
edges.map(new EdgeToTemporalEdge<>(getEdgeFactory()))), config);
}
/**
* Creates a {@link TemporalGraphCollection} instance. By the provided timestamp extractors, it is
* possible to extract temporal information from the data to define a timestamp or time interval that
* represents the beginning and end of the element's validity (valid time).
*
* @param graphHead graph head DataSet
* @param graphHeadTimeIntervalExtractor extractor to pick the time interval from graph heads
* @param vertices vertex DataSet
* @param vertexTimeIntervalExtractor extractor to pick the time interval from vertices
* @param edges edge DataSet
* @param edgeTimeIntervalExtractor extractor to pick the time interval from edges
* @return the logical graph represented as temporal graph with defined valid times
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
*/
public <
G extends GraphHead,
V extends Vertex,
E extends Edge> TemporalGraphCollection fromNonTemporalDataSets(
DataSet<G> graphHead,
TimeIntervalExtractor<G> graphHeadTimeIntervalExtractor,
DataSet<V> vertices,
TimeIntervalExtractor<V> vertexTimeIntervalExtractor,
DataSet<E> edges,
TimeIntervalExtractor<E> edgeTimeIntervalExtractor) {
return new TemporalGraphCollection(this.layoutFactory.fromDataSets(
graphHead.map(new GraphHeadToTemporalGraphHead<>(getGraphHeadFactory(),
graphHeadTimeIntervalExtractor)),
vertices.map(new VertexToTemporalVertex<>(getVertexFactory(),
vertexTimeIntervalExtractor)),
edges.map(new EdgeToTemporalEdge<>(getEdgeFactory(),
edgeTimeIntervalExtractor))), config);
}
/**
* Creates a {@link TemporalGraphCollection} from a (non-temporal) base graph collection.
*
* This method calls {@link #fromNonTemporalDataSets(DataSet, DataSet, DataSet)} on the graph collections
* element data sets.
*
* @param collection Some base graph collection.
* @return The resulting temporal graph collection.
*/
public TemporalGraphCollection fromNonTemporalGraphCollection(
BaseGraphCollection<?, ?, ?, ?, ?> collection) {
return fromNonTemporalDataSets(
collection.getGraphHeads(), collection.getVertices(), collection.getEdges());
}
}