的(近似)差额需要更快的计算差额、更快、近似

2023-09-11 00:16:37 作者:久伴酒伴久绊

我可以与CPU profiler中看到,该 compute_variances()是我的项目的瓶颈。

 %,累计自自总
 时间秒秒调用MS / MS呼叫/通话名
 75.63 5.43 5.43 40 135.75 135.75 compute_variances(unsigned int类型,性病::矢量<点,性病::分配器<点>>常量和放大器;,*浮动,浮动*,无符号整型*)
 19.08 6.80 1.37 readDivisionSpace(Division_Euclidean_space和放大器;,字符*)
 ...
 

下面是函数体:

 无效compute_variances(为size_t T,常量性病::矢量<点和GT;&安培;百​​分点,浮动* AVG,
                       浮动*变种,为size_t * split_dims){
  用于(为size_t D = 0; D<点[0] .dim伪(); d ++){
    平均[D]。= 0.0;
    变种并[d] = 0.0;
  }
  浮动三角洲,N;
  用于(为size_t I = 0; I< points.size(); ++ I){
    N = 1.0 +我;
    用于(为size_t D = 0; D<点[0] .dim伪(); ++ D){
      增量=(分[I] [D]) - 平均[D];
      平均[D]。+ =增量/ N;
      VAR [D]。+ =增量*((分[I] [D]) - 平均[D]。);
    }
  }

  / *查找牛逼尺寸与最大规模的差异。 * /
  kthLargest(VAR,点[0] .dim伪(),T,split_dims);
}
 

其中, kthLargest()似乎并不成为一个问题,因为我看到了:

0.00 7.18 0.00 40 0​​.00 0.00 kthLargest(浮动*,诠释,诠释,无符号整型*)

compute_variances()需要花车向量的向量(即分数,一个向量,其中分数是一类我已经实现),并计算它们的差异,在每个维度(关于克努特的算法)。

最快的数学计算法,为了孩子收藏

下面是我如何调用该函数:

 浮动平均[(*分)[0] .dim伪()];
浮变种[(*分)[0] .dim伪()];
为size_t split_dims [T]

compute_variances(T,*点,平均值,VAR,split_dims);
 

现在的问题是,我可以做的更好?我真的很乐意支付的速度和方差的近似计算之间的权衡。或者,也许我可以使code更多的缓存友好的还是什么?

我编译如下:

G ++ main_noTime.cpp -std =的C ++ 0x -p -pg -O3 -o例如

请注意,该编辑以前,我用了 -03 ,不与资本O。感谢的 ypnos 的,我现在编译优化标记 -O3 。我相信,有他们之间的差别,因为我进行实时测量与之一的这些方法的在我的假网站。

请注意,现在, compute_variances 是主导整个项目的时间!

copute_variances()被称为40倍。

每10个电话,以下保持正确的:

  points.size()= 1000点和[0] .dim伪= 10000
points.size()= 10000点[0] .dim伪= 100
points.size()= 10000点[0] .dim伪= 10000
points.size()= 100000点[0] .dim伪= 100
 

每个呼叫处理不同的数据。

Q:如何快速的获得点[I] [D]

答:点[I] 为std :: vector的,只是第i个元素,其中第二 [] ,是实现这一点,在类。

 常量FT&放大器;运营商[](const int的我)const的{
  如果(ⅰ≤(INT)coords.size()&安培;&安培; I> = 0)
     返回coords.at(ⅰ);
  其他 {
     性病::法院<< 在点错误:: []<<的std :: ENDL;
     出口(1);
  }
  返回的coords [0]; //清除-Wall警告
}
 

其中, COORDS 的std ::矢量 浮动值。这似乎有点重,但不应编译器能够正确足够聪明到predict该分行始终是真的吗? (我在冷启动后的意思)。此外,的std :: vector.at()应该是恒定的时间(如说,在的 REF )。我改变了这一只具有 .AT()在功能和时间测量的身体依然存在,pretty的多少,一样的。

在分工的在 compute_variances()可以肯定的是沉重的东西!然而,Knuth的算法是一个数值稳定的一个,我没能找到另一种算法,将取消这两个数值稳定,没有分裂。

请注意,我的没有的在有趣的并行的现在。

最小类(我想我也没有忘记展示的东西)的例子

 类点{
 上市:

  TYPEDEF浮动FT;

  ...

  / **
   *获取点的尺寸。
   * /
  昏暗的size_t()const的{
    返回coords.size();
  }

  / **
   *运营商返回的坐标给定索引处。
   * @参数我 - 的坐标指数
   * @返回的坐标索引i
   * /
  FT&安培;运营商[](const int的我){
    返回coords.at(ⅰ);
    //这是同样的,如果我有下面的评论code
    / *如果(ⅰ≤(INT)coords.size()&安培;&安培; I> = 0)
      返回coords.at(ⅰ);
    其他 {
      性病::法院<< 在点错误:: []<<的std :: ENDL;
      出口(1);
    }
    返回的coords [0]; //清除-Wall警告* /
  }

  / **
   *运营商返回的坐标给定索引处。 (不变)
   * @参数我 - 的坐标指数
   * @返回的坐标索引i
   * /
  常量FT&放大器;运营商[](const int的我)const的{
        返回coords.at(ⅰ);
    / *如果(ⅰ≤(INT)coords.size()&安培;&安培; I> = 0)
      返回coords.at(ⅰ);
    其他 {
      性病::法院<< 在点错误:: []<<的std :: ENDL;
      出口(1);
    }
    返回的coords [0]; //清除-Wall警告* /
  }

 私人:
  的std ::矢量< FT> COORDS;
};
 

解决方案

下面是我会怎么做,在重要性guesstimated顺序:

返回浮点从点::运算符[] 按值而不是按引用。 使用 COORDS [I] 而不是 coords.at(我),因为你的已的断言,它是在一定范围内。该成员检查的范围。你只需要一次检查。 更换自制错误指示/检查的点::运算符[] 有一个断言。这就是称是。他们是名义上无操作在释放模式 - 我怀疑你需要检查它在释放code 。 有一个司,反复乘替换重复分裂。 通过展开的外循环的第一两次迭代拆下需要浪费初始化 要减少高速缓存未命中的影响,运行内部循环交替向前向后,然后。这至少让你在使用一些缓存平均变种的机会。它实际上可以消除对平均所有高速缓存未命中和 VAR 如果prefetch作品迭代相反的顺序上,它也应该。 在现代C ++编译器,的std ::填写的std ::复制可以利用型排列,并有在比C库更快机会 memset的的memcpy

点::运算符[] 将得到内联的发行版本的机会,并能减少到2个机器指令(有效地址计算和浮点负载) 。这就是你想要的。当然,它必须是定义的头文件中,否则会内联仅当您启用链接时执行code代(又名LTO)。

注意点::运算符[] 的身体只相当于单行 返回coords.at(我)在调试版本。在发布版本中的全部的身体就等于返回COORDS [I] ,不可以 返回coords.at(我)

FT点::运算符[](int i)以常量{   断言(ⅰ> = 0&安培;&安培;我≤(INT)coords.size());   返回COORDS [I] } 常量FT *点:: constData()const的{   返回和放大器; COORDS [0]; } 无效compute_variances(为size_t T,常量性病::矢量<点和GT;&安培;百​​分点,浮动* AVG,                        浮动*变种,为size_t * split_dims) {   断言(points.size()大于0);   const int的D =点[0] .dim伪();   // I = 0,i_n = 1   断言(D&0); #如果__cplusplus> = 201103L   的std :: copy_n(点[0] .constData(),D,平均); #其他   性病::复制(点[0] .constData(),点[0] .constData()+ D,平均); #ENDIF   // I = 1,i_n = 0.5   如果(points.size()> = 2){     断言(点[1] .dim伪()== D);     为(中间体Ð= D - 1 d取代; = 0; --d){       浮动常量增量=百分点[1] [D] - 平均[D];       平均[D]。+ =增量* 0.5F;       变种并[d] =增量*(点[1]并[d] - 平均并[d]);     }   } 其他 {     的std :: fill_n(VAR,D,0.0);   }   //设为i = 2,...   用于(为size_t i = 2; I< points.size()){     {       常量浮i_n = 1.0F /(1.0F + I);       断言(分[I] .dim伪()== D);       对于(INT D = 0; D< D​​组; ++ D){         浮动常量增量=点[I] [D] - 平均[D];         平均[D]。+ =增量* i_n;         VAR [D]。+ =增量*(点[I] [D] - 平均[D]。);       }     }     ++我;     如果(I> = points.size())破;     {       常量浮i_n = 1.0F /(1.0F + I);       断言(分[I] .dim伪()== D);       为(中间体Ð= D - 1 d取代; = 0; --d){         浮动常量增量=点[I] [D] - 平均[D];         平均[D]。+ =增量* i_n;         VAR [D]。+ =增量*(点[I] [D] - 平均[D]。);       }     }     ++我;   }   / *查找牛逼尺寸与最大规模的差异。 * /   kthLargest(VAR,D,T,split_dims); }

I can see with the CPU profiler, that the compute_variances() is the bottleneck of my project.

  %   cumulative   self              self     total           
 time   seconds   seconds    calls  ms/call  ms/call  name    
 75.63      5.43     5.43       40   135.75   135.75  compute_variances(unsigned int, std::vector<Point, std::allocator<Point> > const&, float*, float*, unsigned int*)
 19.08      6.80     1.37                             readDivisionSpace(Division_Euclidean_space&, char*)
 ...

Here is the body of the function:

void compute_variances(size_t t, const std::vector<Point>& points, float* avg,
                       float* var, size_t* split_dims) {
  for (size_t d = 0; d < points[0].dim(); d++) {
    avg[d] = 0.0;
    var[d] = 0.0;
  }
  float delta, n;
  for (size_t i = 0; i < points.size(); ++i) {
    n = 1.0 + i;
    for (size_t d = 0; d < points[0].dim(); ++d) {
      delta = (points[i][d]) - avg[d];
      avg[d] += delta / n;
      var[d] += delta * ((points[i][d]) - avg[d]);
    }
  }

  /* Find t dimensions with largest scaled variance. */
  kthLargest(var, points[0].dim(), t, split_dims);
}

where kthLargest() doesn't seem to be a problem, since I see that:

0.00 7.18 0.00 40 0.00 0.00 kthLargest(float*, int, int, unsigned int*)

The compute_variances() takes a vector of vectors of floats (i.e. a vector of Points, where Points is a class I have implemented) and computes the variance of them, in each dimension (with regard to the algorithm of Knuth).

Here is how I call the function:

float avg[(*points)[0].dim()];
float var[(*points)[0].dim()];
size_t split_dims[t];

compute_variances(t, *points, avg, var, split_dims);

The question is, can I do better? I would really happy to pay the trade-off between speed and approximate computation of variances. Or maybe I could make the code more cache friendly or something?

I compiled like this:

g++ main_noTime.cpp -std=c++0x -p -pg -O3 -o eg

Notice, that before edit, I had used -o3, not with a capital 'o'. Thanks to ypnos, I compiled now with the optimization flag -O3. I am sure that there was a difference between them, since I performed time measurements with one of these methods in my pseudo-site.

Note that now, compute_variances is dominating the overall project's time!

[EDIT]

copute_variances() is called 40 times.

Per 10 calls, the following hold true:

points.size() = 1000   and points[0].dim = 10000
points.size() = 10000  and points[0].dim = 100
points.size() = 10000  and points[0].dim = 10000
points.size() = 100000 and points[0].dim = 100

Each call handles different data.

Q: How fast is access to points[i][d]?

A: point[i] is just the i-th element of std::vector, where the second [], is implemented as this, in the Point class.

const FT& operator [](const int i) const {
  if (i < (int) coords.size() && i >= 0)
     return coords.at(i);
  else {
     std::cout << "Error at Point::[]" << std::endl;
     exit(1);
  }
  return coords[0];  // Clear -Wall warning 
}

where coords is a std::vector of float values. This seems a bit heavy, but shouldn't the compiler be smart enough to predict correctly that the branch is always true? (I mean after the cold start). Moreover, the std::vector.at() is supposed to be constant time (as said in the ref). I changed this to have only .at() in the body of the function and the time measurements remained, pretty much, the same.

The division in the compute_variances() is for sure something heavy! However, Knuth's algorithm was a numerical stable one and I was not able to find another algorithm, that would de both numerical stable and without division.

Note that I am not interesting in parallelism right now.

[EDIT.2]

Minimal example of Point class (I think I didn't forget to show something):

class Point {
 public:

  typedef float FT;

  ...

  /**
   * Get dimension of point.
   */
  size_t dim() const {
    return coords.size();
  }

  /**
   * Operator that returns the coordinate at the given index.
   * @param i - index of the coordinate
   * @return the coordinate at index i
   */
  FT& operator [](const int i) {
    return coords.at(i);
    //it's the same if I have the commented code below
    /*if (i < (int) coords.size() && i >= 0)
      return coords.at(i);
    else {
      std::cout << "Error at Point::[]" << std::endl;
      exit(1);
    }
    return coords[0];  // Clear -Wall warning*/
  }

  /**
   * Operator that returns the coordinate at the given index. (constant)
   * @param i - index of the coordinate
   * @return the coordinate at index i
   */
  const FT& operator [](const int i) const {
        return coords.at(i);
    /*if (i < (int) coords.size() && i >= 0)
      return coords.at(i);
    else {
      std::cout << "Error at Point::[]" << std::endl;
      exit(1);
    }
    return coords[0];  // Clear -Wall warning*/
  }

 private:
  std::vector<FT> coords;
};

解决方案

Here's what I would do, in guesstimated order of importance:

Return the floating-point from the Point::operator[] by value, not by reference. Use coords[i] instead of coords.at(i), since you already assert that it's within bounds. The at member checks the bounds. You only need to check it once. Replace the home-baked error indication/checking in the Point::operator[] with an assert. That's what asserts are for. They are nominally no-ops in release mode - I doubt that you need to check it in release code. Replace the repeated division with a single division and repeated multiplication. Remove the need for wasted initialization by unrolling the first two iterations of the outer loop. To lessen impact of cache misses, run the inner loop alternatively forwards then backwards. This at least gives you a chance at using some cached avg and var. It may in fact remove all cache misses on avg and var if prefetch works on reverse order of iteration, as it well should. On modern C++ compilers, the std::fill and std::copy can leverage type alignment and have a chance at being faster than the C library memset and memcpy.

The Point::operator[] will have a chance of getting inlined in the release build and can reduce to two machine instructions (effective address computation and floating point load). That's what you want. Of course it must be defined in the header file, otherwise the inlining will only be performed if you enable link-time code generation (a.k.a. LTO).

Note that the Point::operator[]'s body is only equivalent to the single-line return coords.at(i) in a debug build. In a release build the entire body is equivalent to return coords[i], not return coords.at(i).

FT Point::operator[](int i) const {
  assert(i >= 0 && i < (int)coords.size());
  return coords[i];
}

const FT * Point::constData() const {
  return &coords[0];
}

void compute_variances(size_t t, const std::vector<Point>& points, float* avg,
                       float* var, size_t* split_dims)
{
  assert(points.size() > 0);
  const int D = points[0].dim();

  // i = 0, i_n = 1
  assert(D > 0);
#if __cplusplus >= 201103L
  std::copy_n(points[0].constData(), D, avg);
#else
  std::copy(points[0].constData(), points[0].constData() + D, avg);
#endif

  // i = 1, i_n = 0.5
  if (points.size() >= 2) {
    assert(points[1].dim() == D);
    for (int d = D - 1; d >= 0; --d) {
      float const delta = points[1][d] - avg[d];
      avg[d] += delta * 0.5f;
      var[d] = delta * (points[1][d] - avg[d]);
    }
  } else {
    std::fill_n(var, D, 0.0f);
  }

  // i = 2, ...
  for (size_t i = 2; i < points.size(); ) {
    {
      const float i_n = 1.0f / (1.0f + i);
      assert(points[i].dim() == D);
      for (int d = 0; d < D; ++d) {
        float const delta = points[i][d] - avg[d];
        avg[d] += delta * i_n;
        var[d] += delta * (points[i][d] - avg[d]);
      }
    }
    ++ i;
    if (i >= points.size()) break;
    {
      const float i_n = 1.0f / (1.0f + i);
      assert(points[i].dim() == D);      
      for (int d = D - 1; d >= 0; --d) {
        float const delta = points[i][d] - avg[d];
        avg[d] += delta * i_n;
        var[d] += delta * (points[i][d] - avg[d]);
      }
    }
    ++ i;
  }

  /* Find t dimensions with largest scaled variance. */
  kthLargest(var, D, t, split_dims);
}