Transformation.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.transformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphFactory;
import org.gradoop.flink.model.api.functions.TransformationFunction;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.operators.transformation.functions.TransformEdge;
import org.gradoop.flink.model.impl.operators.transformation.functions.TransformGraphHead;
import org.gradoop.flink.model.impl.operators.transformation.functions.TransformVertex;
/**
* The modification operators is a unary graph operator that takes a base
* graph as input and applies user defined modification functions on the
* elements of that graph as well as on its graph head.
*
* The identity of the elements is preserved.
*
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The type of the graph.
* @param <GC> The type of the graph collection.
*/
public class Transformation<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>>
implements UnaryBaseGraphToBaseGraphOperator<LG> {
/**
* Modification function for graph heads
*/
protected final TransformationFunction<G> graphHeadTransFunc;
/**
* Modification function for vertices
*/
protected final TransformationFunction<V> vertexTransFunc;
/**
* Modification function for edges
*/
protected final TransformationFunction<E> edgeTransFunc;
/**
* Creates a new operator instance.
*
* @param graphHeadTransFunc graph head transformation function
* @param vertexTransFunc vertex transformation function
* @param edgeTransFunc edge transformation function
*/
public Transformation(
TransformationFunction<G> graphHeadTransFunc,
TransformationFunction<V> vertexTransFunc,
TransformationFunction<E> edgeTransFunc) {
if (graphHeadTransFunc == null && vertexTransFunc == null && edgeTransFunc == null) {
throw new IllegalArgumentException("Provide at least one transformation function.");
}
this.graphHeadTransFunc = graphHeadTransFunc;
this.vertexTransFunc = vertexTransFunc;
this.edgeTransFunc = edgeTransFunc;
}
@Override
public LG execute(LG graph) {
return executeInternal(
graph.getGraphHead(),
graph.getVertices(),
graph.getEdges(),
graph.getFactory());
}
/**
* Applies the transformation functions on the given datasets.
*
* @param graphHeads graph heads
* @param vertices vertices
* @param edges edges
* @param factory the factory that is responsible for creating an instance of the base graph
* @return transformed base graph
*/
protected LG executeInternal(DataSet<G> graphHeads, DataSet<V> vertices, DataSet<E> edges,
BaseGraphFactory<G, V, E, LG, GC> factory) {
DataSet<G> transformedGraphHeads = graphHeadTransFunc != null ? graphHeads
.map(new TransformGraphHead<>(graphHeadTransFunc, factory.getGraphHeadFactory()))
.returns(TypeExtractor.createTypeInfo(factory.getGraphHeadFactory().getType())) : graphHeads;
DataSet<V> transformedVertices = vertexTransFunc != null ? vertices
.map(new TransformVertex<>(vertexTransFunc, factory.getVertexFactory()))
.returns(TypeExtractor.createTypeInfo(factory.getVertexFactory().getType())) : vertices;
DataSet<E> transformedEdges = edgeTransFunc != null ? edges
.map(new TransformEdge<>(edgeTransFunc, factory.getEdgeFactory()))
.returns(TypeExtractor.createTypeInfo(factory.getEdgeFactory().getType())) : edges;
return factory.fromDataSets(transformedGraphHeads, transformedVertices, transformedEdges);
}
}