top of page

Processing 粒子プログラムの高速化(Cell List、Verlet List、並列化)

  • 執筆者の写真: NUM
    NUM
  • 5月17日
  • 読了時間: 11分
目次
はじめに

以前、Particle Leniaという粒子を用いて生物的な形を生み出すシミュレーションの解説記事を書きました。しかし、そのプログラムはO(N²) のオーダーとなり、粒子数が増えると計算コストが急増するという課題がありました。

そこで、今回はParticleベースのプログラムで使えそうな最適化手法をいくつかピックアップし、最適化版Particle Leniaを再実装してみました。

今回は魅力的なビジュアルは無いので、最適化に興味のある方は読んでいただけると嬉しいです。

最適化手法

今回は3つの最適化手法を取り入れます。

Cell List

Cell Listとは、シミュレーション座標を小さなセル(格子)に分割し、各粒子を所属セルに格納することで近傍探索空間を大幅に削減する手法です。セル幅を相互作用カットオフ距離以上に設定し、ある粒子の近傍は自身のセルと隣接セル(Moore近傍)のみを調べることで、計算量を大幅に削減できます。

カットオフ距離 ある距離より遠い粒子間の作用をゼロとみなして計算対象から外し、計算コストを大幅に削減するために設定される閾値です。つまりカットオフ距離内の粒子が相互作用計算の対象です。

Verlet List

Verlet Listとは、相互作用カットオフ距離に加えて余裕距離(skin)を持たせ、リストを何ステップか使い回して距離計算を最小化し、余裕距離を越えた時点でリストを再構築する仕組みです。 本来のVerlet Listは粒子の移動量が余裕距離を越えるか随時監視する必要がありますが、今回は数値的正確性よりビジュアライズ性に重きを置いているため、固定ステップ方式という一定ステップでリスト再構築を行います。


Cell List+Verlet Listのイメージ図
Cell List+Verlet Listのイメージ図

今回はCell ListとVerlet Listのハイブリット方式を実装します。上図がイメージ図です。 セル自体がCell Listの概念で、粒子が各セルに割り当てられています。 赤粒子が計算対象粒子として、緑セル(自身のセルを含む隣接セル)に存在する粒子を相互作用計算対象にします。そこからVerlet Listの概念として相互作用計算対象を赤円内の粒子に絞り込みます。

並列処理

名前の通り処理の並列化を行います。Particle Leniaにおいて計算コストの高い位置更新で並列処理を行います。

実装

import java.util.concurrent.*;
import java.util.stream.IntStream;

// シミュレーションパラメータ
final int    N                     = 1000;   // 粒子数
final float  mu_k                  = 4.0f;   // カーネル中心
final float  sigma_k               = 1.0f;   // カーネル幅
final float  w_k                   = 0.022f; // カーネル重み
final float  mu_g                  = 0.6f;   // 成長関数中心
final float  sigma_g               = 0.15f;  // 成長関数幅
final float  c_rep                 = 1.0f;   // 反発強度
final float  dt                    = 0.1f;   // タイムステップ
final float  repulsionCutoff       = 1.0f;   // 反発カットオフ
final float  repulsionCutoffSq     = repulsionCutoff * repulsionCutoff; // 反発カットオフ2乗(平方距離で使用)
final float  kernelCutoff          = mu_k + 3 * sigma_k;                // カーネル有効範囲
final float  kernelCutoffSq        = kernelCutoff * kernelCutoff;       // カーネル有効範囲2乗(平方距離で使用)
final float  cellSize              = max(repulsionCutoff, kernelCutoff);
final float  verletCutoff          = max(repulsionCutoff, kernelCutoff) + 0.2;
final int    verletRebuildInterval = 60;     // Verlet List再構築間隔

// シミュレーション座標
float[] posX    = new float[N];
float[] posY    = new float[N];
float[] newPosX = new float[N];
float[] newPosY = new float[N];

int[][] neighbors     = new int[N][N]; // 近傍粒子のインデックスを保持
int[]   neighborCount = new int[N];    // 粒子i毎の近傍粒子数を保持

float minX, minY, maxX, maxY; // 境界ボックス
int stepCount = 0;            // ステップカウンタ

// スレッドセーフなデータ構造
ConcurrentHashMap<Long, ConcurrentLinkedQueue<Integer>> grid = new ConcurrentHashMap<>();

void setup() {
  size(800, 800);
  pixelDensity(displayDensity());
  frameRate(100);

  // 粒子初期化
  for (int i = 0; i < N; i++) {
    posX[i] = random(-2, 2);
    posY[i] = random(-2, 2);
  }
}

void draw() {
  background(0);
  updateParticles();

  // 描画
  noStroke();
  for (int i = 0; i < N; i++) {
    float sx = posX[i] * 10 + width / 2;
    float sy = posY[i] * 10 + height / 2;
    int r = 5;
    fill(255, 200);
    circle(sx, sy, r);
    fill(89, 183, 76, 100);
    circle(sx, sy, r*2);
  }
  stepCount++;
}

/**
 * 粒子更新
 * ・Cell List + Verlet List
 * ・勾配計算、位置更新を並列化
 */
void updateParticles() {
  // Verlet List再構築
  if (stepCount % verletRebuildInterval == 0) {
    computeBoundingBox();
    buildGrid();
    buildVerletLists();
  }
  // 勾配計算、位置更新を並列化
  IntStream.range(0, N).parallel().forEach(i -> {
    PVector repGrad = calcRepulsiveGrad(i);           // 反発勾配 ∇R の計算
    PVector growthGrad = calcGrowthGrad(i);           // 成長勾配 ∇G の計算
    PVector gradE = PVector.sub(repGrad, growthGrad); // エネルギー勾配 ∇E = ∇R - ∇G

    // 新位置の計算
    newPosX[i] = posX[i] - gradE.x * dt;
    newPosY[i] = posY[i] - gradE.y * dt;
  });
  System.arraycopy(newPosX, 0, posX, 0, N);
  System.arraycopy(newPosY, 0, posY, 0, N);
}

/**
 * カーネル関数 K(d) を計算
 *
 * @param  粒子間の距離
 * @return カーネル関数の値
 */
float calcKernel(float d) {
  return w_k * exp(-sq(d - mu_k) / sq(sigma_k));
}

/**
 * カーネル関数 K(d) の微分 K'(d) を計算
 *
 * @param  粒子間の距離
 * @return カーネル微分の値
 */
float calcKernelPrime(float d) {
  return -2 * w_k * (d - mu_k) / sq(sigma_k) * exp(-sq(d - mu_k) / sq(sigma_k));
}

/**
 * 粒子iに対する反発効果の勾配 ∇R を計算
 *
 * @param  粒子間の距離
 * @return 反発効果の勾配ベクトル
 */
PVector calcRepulsiveGrad(int i) {
  PVector gradR = new PVector(0, 0);
   // 隣接セルの粒子分だけ反発効果を計算
  for (int k = 0; k < neighborCount[i]; k++) {
    int j = neighbors[i][k];
    float dx = posX[i] - posX[j];
    float dy = posY[i] - posY[j];
    float d2 = dx * dx + dy * dy;
    // カットオフ距離内のみ計算
    if (d2 < 1e-12f || d2 >= repulsionCutoffSq) continue;
    float d = sqrt(d2);
    float w = (1 - d) / d;
    gradR.add(dx * w, dy * w);
  }
  return gradR.mult(-c_rep);
}

/**
 * 粒子iに対する成長効果の勾配∇G を計算
 *
 * @param  粒子間の距離
 * @return 成長効果の勾配ベクトル
 */
PVector calcGrowthGrad(int i) {
  float U = 0;
  PVector gradU = new PVector(0, 0);
  // 隣接セルの粒子分だけ成長効果を計算
  for (int k = 0; k < neighborCount[i]; k++) {
    int j = neighbors[i][k];
    float dx = posX[i] - posX[j];
    float dy = posY[i] - posY[j];
    float d2 = dx * dx + dy * dy;
    // カットオフ距離内のみ計算
    if (d2 < 1e-12f || d2 > kernelCutoffSq) continue;
    float d = sqrt(d2);
    float Kval = calcKernel(d);
    float Kp = calcKernelPrime(d);
    U += Kval;
    gradU.add(dx * (Kp/d), dy * (Kp/d));
  }
  float G = exp(-sq(U - mu_g) / sq(sigma_g));
  float dG = -2 * (U - mu_g) / sq(sigma_g) * G;
  return gradU.mult(dG);
}

// AABB(Axis‑Aligned Bounding Box)の構築
void computeBoundingBox() {
  // minを大きな値、maxを小さな値にすることで必ず更新される
  minX = minY =  Float.POSITIVE_INFINITY;
  maxX = maxY = -Float.POSITIVE_INFINITY;
  // AABBの構築
  for (int i = 0; i < N; i++) {
    minX = min(minX, posX[i]);
    minY = min(minY, posY[i]);
    maxX = max(maxX, posX[i]);
    maxY = max(maxY, posY[i]);
  }
  // そのままだと境界に粒子が配置されるため、1セル分余裕を持たせる
  float m = cellSize;
  minX -= m;
  minY -= m;
  maxX += m;
  maxY += m;
}

// Cell List構築
void buildGrid() {
  grid.clear();
  for (int i = 0; i < N; i++) {
    // 最小値を減算して正の整数化にすることで非負のセルインデックスを取得
    int gx = floor((posX[i] - minX) / cellSize);
    int gy = floor((posY[i] - minY) / cellSize);
    long key = packKey(gx, gy);
    grid.putIfAbsent(key, new ConcurrentLinkedQueue<>());
    grid.get(key).add(i);
  }
}

/**
 * Verlet List構築
 * 対象粒子からカットオフ距離以下である他粒子を近傍粒子リストに追加
 */
void buildVerletLists() {
  for (int i = 0; i < N; i++) {
    int count = 0; // 有効データ数
    float xi = posX[i];
    float yi = posY[i];
    // シミュレーション座標からCell List座標に変換
    int gx = floor((xi - minX) / cellSize);
    int gy = floor((yi - minY) / cellSize);

    // Moore近傍粒子のみ処理対象とする
    for (int dx = -1; dx <= 1; dx++) {
      for (int dy = -1; dy <= 1; dy++) {
        // セル内の粒子群を取得
        long key = packKey(gx + dx, gy + dy);
        ConcurrentLinkedQueue<Integer> cell = grid.get(key);
        if (cell == null) continue;
        for (int j : cell) {
          if (j == i) continue; // 自身は処理対象外
          float ddx = xi - posX[j];
          float ddy = yi - posY[j];
          // 実際の距離を使用しないため平方距離を比較
          if (ddx * ddx + ddy * ddy <= verletCutoff * verletCutoff) {
            neighbors[i][count++] = j; // 計算対象の近傍粒子のインデックスを取得
          }
        }
      }
    }
    // neighborsは固定長配列のため有効データ数を保持
    neighborCount[i] = count;
  }
}

/**
 * グリッド座標(gx, gy)を64ビットのキーにビットパック
 *
 * @param  グリッドのx座標
 * @param  グリッドのy座標
 * @return gxを上位32ビットgyを下位32ビットに詰め込んだ64ビット整数
 */
long packKey(int gx, int gy) {
  return (((long)gx) << 32) | ((long)gy & 0xffffffffL);
}

以下、最適化処理において重要なポイントに絞って解説します。


AABB(Axis‑Aligned Bounding Box)構築


// AABB(Axis‑Aligned Bounding Box)の構築
void computeBoundingBox() {
  // minを大きな値、maxを小さな値にすることで必ず更新される
  minX = minY =  Float.POSITIVE_INFINITY;
  maxX = maxY = -Float.POSITIVE_INFINITY;
  // AABBの構築
  for (int i = 0; i < N; i++) {
    minX = min(minX, posX[i]);
    minY = min(minY, posY[i]);
    maxX = max(maxX, posX[i]);
    maxY = max(maxY, posY[i]);
  }
  // そのままだと境界に粒子が配置されるため、1セル分余裕を持たせる
  float m = cellSize;
  minX -= m;
  minY -= m;
  maxX += m;
  maxY += m;
}

computeBoundingBox関数ではCell Listを正しく構築するために、座標変換とマージン付与を行なっています。


AABBのイメージ図
AABBのイメージ図

シミュレーション座標は初期化処理で-2,2の範囲で設定していました。Cell List座標においては全て非負のインデックス値としたいため変換を行っています。

また、全粒子の x,y 座標から最小・最大値を求め、境界にある粒子の割り当てもれを防ぐためにマージンを追加しています。

Cell List構築


void buildGrid() {
  grid.clear();
  for (int i = 0; i < N; i++) {
    // 最小値を減算して正の整数化にすることで非負のセルインデックスを取得
    int gx = floor((posX[i] - minX) / cellSize);
    int gy = floor((posY[i] - minY) / cellSize);
    long key = packKey(gx, gy);
    grid.putIfAbsent(key, new ConcurrentLinkedQueue<>());
    grid.get(key).add(i);
  }
}

buildGrid関数では全粒子を各セルに割り振ります。 セルに粒子を割り振る表現はConcurrentHashMapという並列処理を想定したHashMapを使用します。キーにセルインデックス、粒子をConcurrentLinkedQueueに格納していきます。 キーは重複しないようにpackKey関数で計算しています。


grid.putIfAbsent(key, new ConcurrentLinkedQueue<>());
grid.get(key).add(i);

この処理は最初にキーの存在判定を行い、存在しなければ新規の ConcurrentLinkedQueue を登録して、バリュー(粒子)を登録します。


Verlet List構築


/**
 * Verlet List構築
 * 対象粒子からカットオフ距離以下である他粒子を近傍粒子リストに追加
 */
void buildVerletLists() {
  for (int i = 0; i < N; i++) {
    int count = 0; // 有効データ数
    float xi = posX[i];
    float yi = posY[i];
    // シミュレーション座標からCell List座標に変換
    int gx = floor((xi - minX) / cellSize);
    int gy = floor((yi - minY) / cellSize);

    // Moore近傍粒子のみ処理対象とする
    for (int dx = -1; dx <= 1; dx++) {
      for (int dy = -1; dy <= 1; dy++) {
        // セル内の粒子群を取得
        long key = packKey(gx + dx, gy + dy);
        ConcurrentLinkedQueue<Integer> cell = grid.get(key);
        if (cell == null) continue;
        for (int j : cell) {
          if (j == i) continue; // 自身は処理対象外
          float ddx = xi - posX[j];
          float ddy = yi - posY[j];
          // 実際の距離を使用しないため平方距離を比較
          if (ddx * ddx + ddy * ddy <= verletCutoff * verletCutoff) {
            neighbors[i][count++] = j; // 計算対象の近傍粒子のインデックスを取得
          }
        }
      }
    }
    // neighborsは固定長配列のため有効データ数を保持
    neighborCount[i] = count;
  }
}

buildVerletLists関数はMoore近傍のみをチェックし、カットオフ内の粒子を隣接リストに追加します。このプログラムはコメントを詳細に記載しているので理解しやすいと思うので、 neighbors、neighborCount配列の目的についてのみ解説します。

配列の役割は以下の通りです。 neighbors:近傍粒子のインデックスを保持 neighborCount:粒子i毎の近傍粒子数を保持

neighbors、neighborCountは反発、成長勾配の計算で使用します。


for (int k = 0; k < neighborCount[i]; k++) {
  int j = neighbors[i][k];
  float dx = posX[i] - posX[j];
  float dy = posY[i] - posY[j];
  float d2 = dx * dx + dy * dy;

相互作用計算は近傍の粒子数分実施する必要があるので、neighborCountの要素数分ループします。実際のインデックス値はneighborsに格納されています。 そもそもArrayListを使用することで二つの配列を用意しなくて済みますが、固定長配列を使用することでオーバーヘッドを無くすことができるのでパフォーマンスが向上します。

勾配計算の並列化


/**
 * 粒子更新
 * ・Cell List + Verlet List
 * ・勾配計算、位置更新を並列化
 */
void updateParticles() {
  // 固定ステップでVerlet List構築
  if (stepCount % verletRebuildInterval == 0) {
    computeBoundingBox();
    buildGrid();
    buildVerletLists();
  }
  // 勾配計算、位置更新を並列化
  IntStream.range(0, N).parallel().forEach(i -> {
    PVector repGrad = calcRepulsiveGrad(i);           // ① 反発勾配 ∇R の計算
    PVector growthGrad = calcGrowthGrad(i);           // ② 成長勾配 ∇G の計算
    PVector gradE = PVector.sub(repGrad, growthGrad); // ③ 合成勾配 ∇E = ∇R - ∇G

    // ④ 新位置の計算
    newPosX[i] = posX[i] - gradE.x * dt;
    newPosY[i] = posY[i] - gradE.y * dt;
  });
  
  System.arraycopy(newPosX, 0, posX, 0, N);
  System.arraycopy(newPosY, 0, posY, 0, N);
}

Javaの並列ストリームAPIを使って一番コストがかかる計算処理を並列化しています。 parallel()を呼び出すことで並列モードとなります。この書き方は並列処理の仕組みをほぼ意識せずに使えるためかなり有用です。これはGPTに相談して教えてもらいました。 IntStream.range(0, N).parallel().forEach(i -> {  が並列化の記述方法になります。

まとめ

実際に最適化したことでどのくらい高速化したのか以下条件で検証してみました。

粒子数:1000個 updateParticles関数ループ数:1000回

最適化前:266秒 最適化後:16秒

約16倍ほど高速化していますね。ビジュアルは同じでもアルゴリズムを検討し直すとこれほど高速化されるとは恐ろしいですね。

僕がジェネラティブアートをしているのはビジュアルが面白いというのが一番にありますが、 早さに視点を置くことは、プログラムでアート製作をしているからこそできる別の楽しみでもありますよね。 これからは「プログラムだからこそ」という視点も今後の活動に取り入れていければと思います。 最後まで読んでいただきありがとうございました!

参考

תגובות


bottom of page