Printer.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.common.debug;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.gradoop.common.model.impl.properties.PropertyValue;

import java.util.List;
import java.util.Map;

/**
 * Base class for printing debug output using vertex and edge mappings. Both
 * map an internal id to a readable numeric id.
 *
 * @param <IN> input type
 * @param <K> key type
 */
@FunctionAnnotation.ForwardedFields("*")
public abstract class Printer<IN, K> extends RichMapFunction<IN, IN> {
  /**
   * Broadcast set name for vertex mapping
   */
  public static final String VERTEX_MAPPING = "vertexMapping";
  /**
   * Broadcast set name for edge mapping
   */
  public static final String EDGE_MAPPING = "edgeMapping";
  /**
   * Logger
   */
  private static final Logger LOG = LogManager.getLogger(Printer.class);
  /**
   * Mapping {@code gradoopId -> propertyValue}
   *
   * The value is used to represent the vertex with the corresponding id.
   */
  protected Map<K, PropertyValue> vertexMap;
  /**
   * Mapping {@code gradoopId -> propertyValue}
   *
   * The value is used to represent the edge with the corresponding id.
   */
  protected Map<K, PropertyValue> edgeMap;
  /**
   * String is put in front of debug output.
   */
  protected final String prefix;
  /**
   * Used to differ between iterative and non-iterative runtime context.
   */
  protected final boolean isIterative;
  /**
   * Number to display if not in iterative context but iteration is set.
   */
  private final int iterationNumber;
  /**
   * Constructor
   */
  public Printer() {
    this(false, "");
  }

  /**
   * Constructor
   *
   * Automatically determines the current iteration when instantiated in Flink iteration.
   *
   * @param isIterative true, if called in a Flink iterative context (i.e. bulk/delta iteration)
   * @param prefix      prefix for output
   */
  public Printer(boolean isIterative, String prefix) {
    this.isIterative     = isIterative;
    this.iterationNumber = 0;
    this.prefix          = prefix;
  }

  /**
   * Constructor
   *
   * Requires the manual definition of the current iteration. This is necessary when called
   * outside a Flink iteration, but in an iterative context (e.g. a Java loop).
   *
   * @param iterationNumber current iteration
   * @param prefix          prefix for debug string
   */
  public Printer(int iterationNumber, String prefix) {
    this.isIterative     = false;
    this.iterationNumber = iterationNumber;
    this.prefix          = prefix;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    List<Tuple2<K, PropertyValue>> vertexMapping = getRuntimeContext()
      .getBroadcastVariable(VERTEX_MAPPING);
    vertexMap = initMapping(vertexMapping);
    List<Tuple2<K, PropertyValue>> edgeMapping = getRuntimeContext()
      .getBroadcastVariable(EDGE_MAPPING);
    edgeMap = initMapping(edgeMapping);
  }

  @Override
  public IN map(IN in) throws Exception {
    getLogger().debug(String.format("%s%s", getHeader(), getDebugString(in)));
    return in;
  }

  /**
   * Returns the debug string representation of the concrete Object.
   *
   * @param in input object
   * @return debug string representation for input object
   */
  protected abstract String getDebugString(IN in);

  /**
   * Returns the logger for the concrete subclass.
   *
   * @return logger
   */
  protected abstract Logger getLogger();

  /**
   * Builds the header for debug string which contains the prefix and the
   * superstep number (0 if non-iterative).
   *
   * @return debug header
   */
  private String getHeader() {
    return String.format("[%d][%s]: ",
      isIterative ? getIterationRuntimeContext().getSuperstepNumber() : iterationNumber,
      prefix != null && !prefix.isEmpty() ? prefix : " ");
  }

  /**
   * Used to initialize vertex and edge mapping.
   *
   * @param tuples broadcast set tuples
   * @return mapping
   */
  private Map<K, PropertyValue> initMapping(
    List<Tuple2<K, PropertyValue>> tuples) {
    Map<K, PropertyValue> map = Maps.newHashMap();
    for (Tuple2<K, PropertyValue> tuple : tuples) {
      map.put(tuple.f0, tuple.f1);
    }
    return map;
  }

  /**
   * Converts a list of ids into a list of corresponding property values.
   *
   * @param ids       gradoop ids
   * @param isVertex  true - use vertex mapping, false - use edge mapping
   * @return list of property values
   */
  protected List<PropertyValue> convertList(List<K> ids, boolean isVertex) {
    List<PropertyValue> result = Lists.newArrayListWithCapacity(ids.size());
    for (K gradoopId : ids) {
      result.add(isVertex ? vertexMap.get(gradoopId) : edgeMap.get(gradoopId));
    }
    return result;
  }

  /**
   * Checks if log level is debug and the mappings are initialized.
   *
   * @param vertexMapping mapping between vertex id and debug property value
   * @param edgeMapping   mapping between edge id and debug property value
   * @param <K>           key type
   * @return true, iff logging is enabled
   */
  private static <K> boolean isDebugEnabled(
    DataSet<Tuple2<K, PropertyValue>> vertexMapping,
    DataSet<Tuple2<K, PropertyValue>> edgeMapping) {
    return LOG.isDebugEnabled() && vertexMapping != null && edgeMapping != null;
  }

  /**
   * Prints out logging information if logging is enabled and mapping information is available.
   *
   * @param dataSet       dataset to be logged
   * @param printer       prints each dataset row (or a subset of it)
   * @param vertexMapping mapping between vertex id and debug property value
   * @param edgeMapping   mapping between edge id and debug property value
   * @param <T>           dataset type
   * @param <K>           key type
   * @return unmodified dataset
   */
  public static <T, K> DataSet<T> log(DataSet<T> dataSet, Printer<T, K> printer,
    DataSet<Tuple2<K, PropertyValue>> vertexMapping,
    DataSet<Tuple2<K, PropertyValue>> edgeMapping) {
    if (isDebugEnabled(vertexMapping, edgeMapping)) {
      return dataSet
        .map(printer)
        .withBroadcastSet(vertexMapping, Printer.VERTEX_MAPPING)
        .withBroadcastSet(edgeMapping, Printer.EDGE_MAPPING);
    } else {
      return dataSet;
    }
  }
}