• 阅读: 1156 回复: 0
    塔崽

    Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

    楼主 发表于 2019-08-20 11:06:26

    作者|白松

    目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。在大同步后,收到消息的顶点会被激活,变为Active状态,然后调用顶点的compute()方法。本文的目的就是统计每次迭代过程中,参与计算的顶点数目。下面附上SSSP的compute()方法:

    @Override
      public void compute(Iterable messages) {
        if (getSuperstep() == 0) {
          setValue(new DoubleWritable(Double.MAX_VALUE));
        }
        double minDist = isSource() ? 0d : Double.MAX_VALUE;
        for (DoubleWritable message : messages) {
          minDist = Math.min(minDist, message.get());
        }
        if (minDist < getValue().get()) {
          setValue(new DoubleWritable(minDist));
          for (Edge edge : getEdges()) {
            double distance = minDist + edge.getValue().get();
            sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
          }
        }
    	//把顶点置为InActive状态
        voteToHalt();
      }

    附:giraph中算法的终止条件是:没有活跃顶点且worker间没有消息传递。

    hama-0.6.0中算法的终止条件只是:判断是否有活跃顶点。不是真正的pregel思想,半成品。

    修改过程如下:

    1. org.apache.giraph.partition. PartitionStats 类

    添加变量和方法,用来统计每个Partition在每个超步中参与计算的顶点数目。添加的变量和方法如下:

    /** computed vertices in this partition */
    private long computedVertexCount=0;
     
    /**
    * Increment the computed vertex count by one.
    */
    public void incrComputedVertexCount() {
        ++ computedVertexCount;
    }
     
    /**
     * @return the computedVertexCount
     */
    public long getComputedVertexCount() {
    	return computedVertexCount;
    }

    修改readFields()和write()方法,每个方法追加最后一句。当每个Partition计算完成后,会把自己的computedVertexCount发送给Master,Mater再读取汇总。

    @Override
    public void readFields(DataInput input) throws IOException {
        partitionId = input.readInt();
        vertexCount = input.readLong();
        finishedVertexCount = input.readLong();
        edgeCount = input.readLong();
        messagesSentCount = input.readLong();
        //添加下条语句
        computedVertexCount=input.readLong();
    }
     
    @Override
    public void write(DataOutput output) throws IOException {
        output.writeInt(partitionId);
        output.writeLong(vertexCount);
        output.writeLong(finishedVertexCount);
        output.writeLong(edgeCount);
        output.writeLong(messagesSentCount);
        //添加下条语句
        output.writeLong(computedVertexCount);
    }
    1. org.apache.giraph.graph. GlobalStats 类

      添加变量和方法,用来统计每个超步中参与计算的顶点总数目,包含每个Worker上的所有Partitions。

     /** computed vertices in this partition 
      *  Add by BaiSong 
      */
      private long computedVertexCount=0;
    	 /**
    	 * @return the computedVertexCount
    	 */
    	public long getComputedVertexCount() {
    		return computedVertexCount;
    	}

    修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。

    /**
      * Add the stats of a partition to the global stats.
      *
      * @param partitionStats Partition stats to be added.
      */
      public void addPartitionStats(PartitionStats partitionStats) {
        this.vertexCount += partitionStats.getVertexCount();
        this.finishedVertexCount += partitionStats.getFinishedVertexCount();
        this.edgeCount += partitionStats.getEdgeCount();
        //Add by BaiSong,添加下条语句
        this.computedVertexCount+=partitionStats.getComputedVertexCount();
     }

    当然为了Debug方便,也可以修改该类的toString()方法(可选),修改后的如下:

    public String toString() {
    		return "(vtx=" + vertexCount + ", computedVertexCount="
    				+ computedVertexCount + ",finVtx=" + finishedVertexCount
    				+ ",edges=" + edgeCount + ",msgCount=" + messageCount
    				+ ",haltComputation=" + haltComputation + ")";
    	}
    1. org.apache.giraph.graph. ComputeCallable<I,V,E,M>

    添加统计功能。在computePartition()方法中,添加下面一句。

    if (!vertex.isHalted()) {
            context.progress();
            TimerContext computeOneTimerContext = computeOneTimer.time();
            try {
                vertex.compute(messages);
    	    //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1
                partitionStats.incrComputedVertexCount();
            } finally {
               computeOneTimerContext.stop();
            }
    ……
    1. 添加Counters统计,和我的博客[Giraph源码分析(七)—— 添加消息统计功能]([https://bbs.dtwave.com/forum/show/59](https://bbs.dtwave.com/forum/show/59)) 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:

    package org.apache.giraph.counters;
     
    import java.util.Iterator;
    import java.util.Map;
     
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import com.google.common.collect.Maps;
     
    /**
     * Hadoop Counters in group "Giraph Messages" for counting every superstep
     * message count.
     */
     
    public class GiraphComputedVertex extends HadoopCountersBase {
    	/** Counter group name for the giraph Messages */
    	public static final String GROUP_NAME = "Giraph Computed Vertex";
     
    	/** Singleton instance for everyone to use */
    	private static GiraphComputedVertex INSTANCE;
     
    	/** superstep time in msec */
    	private final Map superstepVertexCount;
     
    	private GiraphComputedVertex(Context context) {
    		super(context, GROUP_NAME);
    		superstepVertexCount = Maps.newHashMap();
    	}
     
    	/**
    	 * Instantiate with Hadoop Context.
    	 * 
    	 * @param context
    	 *            Hadoop Context to use.
    	 */
    	public static void init(Context context) {
    		INSTANCE = new GiraphComputedVertex(context);
    	}
     
    	/**
    	 * Get singleton instance.
    	 * 
    	 * @return singleton GiraphTimers instance.
    	 */
    	public static GiraphComputedVertex getInstance() {
    		return INSTANCE;
    	}
     
    	/**
    	 * Get counter for superstep messages
    	 * 
    	 * @param superstep
    	 * @return
    	 */
    	public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {
    		GiraphHadoopCounter counter = superstepVertexCount.get(superstep);
    		if (counter == null) {
    			String counterPrefix = "Superstep: " + superstep+" ";
    			counter = getCounter(counterPrefix);
    			superstepVertexCount.put(superstep, counter);
    		}
    		return counter;
    	}
     
    	@Override
    	public Iterator iterator() {
    		return superstepVertexCount.values().iterator();
    	}
    }
    1. 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:

    上图测试中,共有6次迭代。红色框中,显示出了每次迭代过冲参与计算的顶点数目,依次是:9,4,4,3,4,0

    解释:在第0个超步,每个顶点都是活跃的,所有共有9个顶点参与计算。在第5个超步,共有0个顶点参与计算,那么就不会向外发送消息,加上每个顶点都是不活跃的,所以算法迭代终止。

     

热门文章

一篇搞懂TCP、HTTP、Socket、Socket连接池

澜学院|Mock工具wiremock-py

Giraph源码分析(七)—— 添加消息统计功能

Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

最新文章

我DW

央视新闻《玩大小单双最厉害的回血老师》央视网

央视新闻《回血上岸最快的方法》央视网

央视新闻《大发最有实力带人回血的导师》央视网

  • 未登录

    回复楼主

    登录后可回复
    /1000