UpdateVertexState.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.configuration.Configuration;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples
.IdPair;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples
.Message;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.FatVertex;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.util.MessageType;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
/**
* Updates the state of a {@link FatVertex} according to the message it
* receives.
*
* Forwarded Fields First:
*
* f0: vertex id
*/
@FunctionAnnotation.ForwardedFieldsFirst("f0")
public class UpdateVertexState
extends RichJoinFunction<FatVertex, Message, FatVertex> {
/**
* serial version uid
*/
private static final long serialVersionUID = 42L;
/**
* GDL query
*/
private final String query;
/**
* Query handler
*/
private transient QueryHandler queryHandler;
/**
* Constructor
*
* @param query GDL query
*/
public UpdateVertexState(String query) {
this.query = query;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
queryHandler = new QueryHandler(query);
}
@Override
public FatVertex join(FatVertex fatVertex, Message message) throws Exception {
if (message != null) {
// TODO: message to SELF should be processed first
for (int i = 0; i < message.getSenderIds().size(); i++) {
processDeletion(fatVertex, message.getSenderIds().get(i),
message.getDeletions().get(i), message.getMessageTypes().get(i));
}
fatVertex.setUpdated(true);
} else {
fatVertex.setUpdated(false);
}
return fatVertex;
}
/**
* Processes a deletion on the current vertex.
*
* @param fatVertex fat vertex
* @param senderId sender vertexId
* @param deletion sender vertex candidate deletion id
* @param messageType message type
*/
private void processDeletion(FatVertex fatVertex, GradoopId senderId,
Long deletion, MessageType messageType) {
switch (messageType) {
case FROM_SELF:
updateCandidates(fatVertex, deletion);
break;
case FROM_CHILD:
updateOutgoingEdges(fatVertex,
queryHandler.getEdgeIdsByTargetVertexId(deletion), senderId);
break;
case FROM_PARENT:
updateIncomingEdges(fatVertex,
queryHandler.getEdgeIdsBySourceVertexId(deletion));
break;
case FROM_CHILD_REMOVE:
updateOutgoingEdges(fatVertex, senderId);
break;
case FROM_PARENT_REMOVE:
updateIncomingEdges(fatVertex,
queryHandler.getEdgeIdsBySourceVertexId(deletion));
updateParentIds(fatVertex, senderId);
break;
default:
throw new IllegalArgumentException("Unsupported type: " + messageType);
}
}
/**
* Removes the sender vertex id from the parents of the current vertex.
*
* @param fatVertex fat vertex
* @param senderId sender vertex id
*/
private void updateParentIds(FatVertex fatVertex, GradoopId senderId) {
fatVertex.getParentIds().remove(senderId);
}
/**
* Updates the candidates of the given vertex. If the vertex has still
* candidates left, the outgoing edges will be updated accordingly.
*
* @param fatVertex fat vertex
* @param deletion vertex candidate to be removed
*/
private void updateCandidates(FatVertex fatVertex, Long deletion) {
fatVertex.getCandidates().remove(deletion);
if (!fatVertex.getCandidates().isEmpty()) {
updateOutgoingEdges(fatVertex,
queryHandler.getEdgeIdsBySourceVertexId(deletion));
}
}
/**
* Updates incoming edge of the given vertex. The method decrements the
* counter for each query edge candidate.
*
* @param fatVertex fat vertex
* @param queryEdges edge candidates to be removed
*/
private void updateIncomingEdges(FatVertex fatVertex, Collection<Long>
queryEdges) {
for (Long eQ : queryEdges) {
if (fatVertex.getIncomingCandidateCounts()[eQ.intValue()] > 0) {
fatVertex.getIncomingCandidateCounts()[eQ.intValue()]--;
}
}
}
/**
* Deletes all outgoing edges which point to the given target id.
*
* @param vertex fat vertex
* @param targetId target vertex
*/
private void updateOutgoingEdges(FatVertex vertex, GradoopId targetId) {
Iterator<IdPair> iterator = vertex.getEdgeCandidates().keySet().iterator();
while (iterator.hasNext()) {
if (iterator.next().getTargetId().equals(targetId)) {
iterator.remove();
}
}
}
/**
* Updates the outgoing edges of the given vertex according to a collection
* of edge candidates which will be deleted.
*
* @param fatVertex fat vertex
* @param queryEdges edge candidates to be removed
*/
private void updateOutgoingEdges(FatVertex fatVertex,
Collection<Long> queryEdges) {
updateOutgoingEdges(fatVertex, queryEdges, null);
}
/**
* Updates the outgoing edges of the given vertex according to a collection of
* edge candidates which will be deleted. Updates only those outgoing edges
* which point to a specific target vertex.
*
* @param fatVertex fat vertex
* @param queryEdges edge candidates to be removed
* @param targetVertex target vertex to consider
*/
private void updateOutgoingEdges(FatVertex fatVertex,
Collection<Long> queryEdges, GradoopId targetVertex) {
if (queryEdges != null) {
Iterator<Map.Entry<IdPair, boolean[]>> edgeIterator = fatVertex
.getEdgeCandidates().entrySet().iterator();
while (edgeIterator.hasNext()) {
Map.Entry<IdPair, boolean[]> e = edgeIterator.next();
if (targetVertex == null ||
e.getKey().getTargetId().equals(targetVertex)) {
// remove edge candidates for that edge
for (Long eQ : queryEdges) {
e.getValue()[eQ.intValue()] = false;
}
// remove edge if there are no candidates left
if (allFalse(e.getValue())) {
edgeIterator.remove();
}
}
}
}
}
/**
* Checks if the given array contains only false values.
*
* @param a array
* @return true, iff all values are false
*/
private boolean allFalse(boolean[] a) {
for (boolean b : a) {
if (b) {
return false;
}
}
return true;
}
}