ExpandEmbeddingsNode.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.binary;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.utils.ExpandDirection;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.expand
.ExpandEmbeddings;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.expand.ExpandEmbeddingsBulk;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.BinaryNode;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.JoinNode;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.PlanNode;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Binary node that wraps an {@link ExpandEmbeddingsBulk} operator.
*/
public class ExpandEmbeddingsNode extends BinaryNode implements JoinNode {
/**
* Column to expand the embedding from.
*/
private final int expandColumn;
/**
* Query variable of the first vertex in the path
*/
private final String startVariable;
/**
* Query variable of the variable length path
*/
private final String pathVariable;
/**
* Query variable of the last vertex in the path
*/
private final String endVariable;
/**
* Minimum number of path expansion steps
*/
private final int lowerBound;
/**
* Maximum number of path expansion steps
*/
private final int upperBound;
/**
* Column that contains the final vertex of the expansion
*/
private final int closingColumn;
/**
* Direction in which to expand the embedding
*/
private final ExpandDirection expandDirection;
/**
* Morphism type for vertices
*/
private final MatchStrategy vertexStrategy;
/**
* Morphism type for edges
*/
private final MatchStrategy edgeStrategy;
/**
* Creates a new node.
*
* @param leftChild left child representing the embeddings to expand
* @param rightChild right child representing the edges to expand with
* @param startVariable vertex variable on which to start the expansion
* @param pathVariable variable representing the path
* @param endVariable vertex variable on which to end the expansion
* @param lowerBound minimum number of expansions
* @param upperBound maximum number of expansions
* @param expandDirection edge direction in the expansion
* @param vertexStrategy morphism strategy for vertices
* @param edgeStrategy morphism strategy for edges
*/
public ExpandEmbeddingsNode(PlanNode leftChild, PlanNode rightChild,
String startVariable, String pathVariable, String endVariable,
int lowerBound, int upperBound, ExpandDirection expandDirection,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy) {
super(leftChild, rightChild);
this.pathVariable = pathVariable;
this.startVariable = startVariable;
this.endVariable = endVariable;
this.lowerBound = lowerBound;
this.upperBound = upperBound == 0 ? Integer.MAX_VALUE : upperBound;
this.expandDirection = expandDirection;
this.vertexStrategy = vertexStrategy;
this.edgeStrategy = edgeStrategy;
this.expandColumn = leftChild.getEmbeddingMetaData().getEntryColumn(startVariable);
this.closingColumn = leftChild.getEmbeddingMetaData().containsEntryColumn(endVariable) ?
leftChild.getEmbeddingMetaData().getEntryColumn(endVariable) : -1;
}
@Override
public DataSet<Embedding> execute() {
ExpandEmbeddings op = new ExpandEmbeddingsBulk(
getLeftChild().execute(), getRightChild().execute(),
expandColumn, lowerBound, upperBound, expandDirection,
getDistinctVertexColumns(getLeftChild().getEmbeddingMetaData()),
getDistinctEdgeColumns(getLeftChild().getEmbeddingMetaData()),
closingColumn, JoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES);
op.setName(toString());
return op.evaluate();
}
@Override
protected EmbeddingMetaData computeEmbeddingMetaData() {
EmbeddingMetaData inputMetaData = getLeftChild().getEmbeddingMetaData();
EmbeddingMetaData metaData = new EmbeddingMetaData(inputMetaData);
metaData.setEntryColumn(pathVariable, EmbeddingMetaData.EntryType.PATH,
inputMetaData.getEntryCount());
metaData.setDirection(pathVariable, expandDirection);
if (!inputMetaData.containsEntryColumn(endVariable)) {
metaData.setEntryColumn(endVariable, EmbeddingMetaData.EntryType.VERTEX,
inputMetaData.getEntryCount() + 1);
}
return metaData;
}
/**
* According to the specified {@link #vertexStrategy} and the specified
* {@link EmbeddingMetaData}, the method returns the columns that need to contain distinct
* entries.
*
* @param metaData meta data for the embedding
* @return distinct vertex columns
*/
private List<Integer> getDistinctVertexColumns(EmbeddingMetaData metaData) {
return this.vertexStrategy == MatchStrategy.ISOMORPHISM ? metaData.getVertexVariables().stream()
.map(metaData::getEntryColumn)
.collect(Collectors.toList()) : Collections.emptyList();
}
/**
* According to the specified {@link #edgeStrategy} and the specified
* {@link EmbeddingMetaData}, the method returns the columns that need to contain distinct
* entries.
*
* @param metaData meta data for the embedding
* @return distinct edge columns
*/
private List<Integer> getDistinctEdgeColumns(EmbeddingMetaData metaData) {
return edgeStrategy == MatchStrategy.ISOMORPHISM ?
metaData.getEdgeVariables().stream()
.map(metaData::getEntryColumn)
.collect(Collectors.toList()) : Collections.emptyList();
}
@Override
public String toString() {
return String.format("ExpandEmbeddingsNode={" +
"startVariable='%s', " +
"pathVariable='%s', " +
"endVariable='%s', " +
"lowerBound=%d, " +
"upperBound=%d, " +
"expandDirection=%s, " +
"vertexMorphismType=%s, " +
"edgeMorphismType=%s}",
startVariable, pathVariable, endVariable, lowerBound, upperBound, expandDirection,
vertexStrategy, edgeStrategy);
}
}