TemporalBaseLayoutFactory.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.layout;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.layouts.BaseLayoutFactory;
import org.gradoop.flink.model.impl.functions.bool.False;
import org.gradoop.flink.util.GradoopFlinkConfig;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalEdgeFactory;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHeadFactory;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.model.impl.pojo.TemporalVertexFactory;
import org.gradoop.temporal.util.TemporalGradoopConfig;
import java.util.Collection;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
/**
* A base class for temporal layout factories.
*/
class TemporalBaseLayoutFactory implements
BaseLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> {
/**
* The temporal config.
*/
private TemporalGradoopConfig config;
/**
* The graph head factory.
*/
private final GraphHeadFactory<TemporalGraphHead> graphHeadFactory;
/**
* The vertex factory.
*/
private final VertexFactory<TemporalVertex> vertexFactory;
/**
* The edge factory.
*/
private final EdgeFactory<TemporalEdge> edgeFactory;
/**
* Initialize this factory.
*/
TemporalBaseLayoutFactory() {
graphHeadFactory = new TemporalGraphHeadFactory();
vertexFactory = new TemporalVertexFactory();
edgeFactory = new TemporalEdgeFactory();
}
/**
* Create a {@link DataSet} from a possible empty collection.
*
* @param collection The collection of elements.
* @param dummyFactory A factory used to create a dummy element (in case the collection is empty).
* @param <E> The type of the elements.
* @return A dataset containing the elements of the collection.
*/
protected <E> DataSet<E> createDataSet(Collection<E> collection, Supplier<E> dummyFactory) {
requireNonNull(collection, "Element collection was null");
requireNonNull(dummyFactory, "Dummy element provider was null");
final ExecutionEnvironment executionEnvironment = getConfig().getExecutionEnvironment();
if (collection.isEmpty()) {
return executionEnvironment.fromElements(dummyFactory.get())
.filter(new False<>());
} else {
return executionEnvironment.fromCollection(collection);
}
}
/**
* Create a {@link DataSet} from a possibly empty collection of {@link TemporalGraphHead graph heads}.
*
* @param heads The collection of graph heads.
* @return The dataset of graph heads.
*/
protected DataSet<TemporalGraphHead> createGraphHeadDataSet(Collection<TemporalGraphHead> heads) {
return createDataSet(heads, () -> getGraphHeadFactory().createGraphHead());
}
/**
* Create a {@link DataSet} from a possibly empty collection of {@link TemporalVertex vertices}.
*
* @param vertices The collection of vertices.
* @return The dataset of vertices.
*/
protected DataSet<TemporalVertex> createVertexDataSet(Collection<TemporalVertex> vertices) {
return createDataSet(vertices, () -> getVertexFactory().createVertex());
}
/**
* Create a {@link DataSet} from a possibly empty collection of {@link TemporalEdge edges}.
*
* @param edges The collection of edges.
* @return The dataset of edges.
*/
protected DataSet<TemporalEdge> createEdgeDataSet(Collection<TemporalEdge> edges) {
return createDataSet(edges,
() -> getEdgeFactory().createEdge(GradoopId.NULL_VALUE, GradoopId.NULL_VALUE));
}
/**
* Get the temporal config to be used by this factory.
*
* @return The config.
* @throws IllegalStateException when the config has not been set yet.
*/
protected TemporalGradoopConfig getConfig() {
if (config != null) {
return config;
}
throw new IllegalStateException("No config is set for this factory.");
}
@Override
public void setGradoopFlinkConfig(GradoopFlinkConfig config) {
if (config instanceof TemporalGradoopConfig) {
this.config = (TemporalGradoopConfig) config;
} else {
throw new IllegalArgumentException("The config has to be an instance of [" +
TemporalGradoopConfig.class.getSimpleName() + "].");
}
}
@Override
public GraphHeadFactory<TemporalGraphHead> getGraphHeadFactory() {
return graphHeadFactory;
}
@Override
public VertexFactory<TemporalVertex> getVertexFactory() {
return vertexFactory;
}
@Override
public EdgeFactory<TemporalEdge> getEdgeFactory() {
return edgeFactory;
}
}