さて、数日前に、大規模疎行列のCRS,CCS構造体の話をし、次の日にはタプル要素間の任意値での比較用ファンクタ、そしてまた次の日にはstable_sortの本質と使い方について順を追って説明してきたね。
以上の流れで見てきた中で気付くのは、
CRSは、非ゼロ要素を、列IDでsortした後に、行IDでstable_sortしたものであり、
CCSは、非ゼロ要素を、行IDでsortした後に、列IDでstable_sortしたものである。
ということなんだね。そこで思わず考えてしまうのは、
非ゼロ要素情報のセットさえ持っていれば、CRSもCCSも、ソースコード上は上記の如くソート2回すれば構築することが出来きてしまうわけなのであり、しかもその処理たるや大して重いものでも何でもないので、あえてCRSやCCSという形式で情報を構築・保持しておく必要はない。つまり疎行列のデータ構造なんてものは、非ゼロ要素情報のセットさえ適当なコンテナに詰めて持っておくだけでよいのではないか?
と思ってWikipediaを見てみると、この非ゼロ要素情報のセットだけ持っておく方式にもCoordinateList(COO)という名前がついており、MATLABの内部でも採用されている方式だということだ。
なのでCOOで考えていこう、な、そうしよう。巨大疎行列を俺たちはCOOという形式で持ち、そしてそのためのアルゴリズムを考えていこうじゃないか。
まずはデータ構造だ。上記に述べたCOOのデータ構造としては以下のようなものが考えつくところだろう。
template< class FloatType=float > class SparseMatrix { public: typedef tuple< int, int, FloatType > TupType; public: POC_PROP( vector< TupType >, _elem_vec, ElemVec ); POC_PROP( int, _m, M ); POC_PROP( int, _n, N ); };
POC_PROPはこちらを見てくれ。そう、メンバ変数_elem_vecは、非ゼロ要素の「行ID,列ID、値」の、トリプレットをたくさん覚えておくためだけのただのコンテナだ。これだけでいい。デフォルトコンストラクタ、コピーコンストラクタとイコール=演算子も定義しておこう。
SparseMatrix::SparseMatrix( void ){ _m = _n = 0; } SparseMatrix::SparseMatrix ( const SparseMatrix& x ) { this->_m = x._m; this->_n = x._n; this->_elem_vec = x._elem_vec; } SparseMatrix SparseMatrix::operator=( const SparseMatrix& x ) { if( this == &x ) return *this; this->_m = x._m; this->_n = x._n; this->_elem_vec = x._elem_vec; return *this; }
この辺はお約束実装だよな。
次に要素を追加するメソッドが必要だ。
void SparseMatrix::AddElem( int i, int j, const FloatType& val ) { this->_elem_vec.push_back( make_tuple( i, j, val ) ); if( _m< (i+1) ) _m = i+1; if( _n< (j+1) ) _n = j+1; }
そう、ここでは追加リクエストのあった要素の位置指定から、行列のサイズを同時に更新しているね。これは全ての要素がゼロの、行または列が存在しないということを前提にしているよ。あとあえて禁止はしてないが、値ゼロを要素として登録するのは当然ルール違反とする。
さあ早速こいつを使っていこうじゃないか。たとえばこんな大きさ11×11の疎行列を考えよう。
2 -1 0 0 0 0 0 0 0 0 -1 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 0 0 0 0 0 0 0 0 0 -1 2 -1 -1 0 0 0 0 0 0 0 0 -1 2
どうかな11×11とはいえ、十分ゼロがたくさんあって、部分的に非ゼロ要素があるね。しかも対角成分は2で、その左右の値は-1という規則性があることもわかる。なのでこれを俺たちのSparseMatrixとして構築するコードは以下のようになるだろう。
SparseMatrix<> mat; typedef SparseMatrix<>::TupType TupType; const int n=11; for( int j=0; j<n; ++j ){ int jl = (j==0)? n-1: j-1; int jr = (j==n-1)? 0: j+1; mat.AddElem( j, jl, -1 ); mat.AddElem( j, j, 2 ); mat.AddElem( j, jr, -1 ); }// j
さあ、できた!…本当?本当に正しくできたか確認するための中身を表示するためのプログラムを書いてみよう。子供たちにはそのためのメソッド、SparseMatrix::Dump() を自分で考えて作ってみることをお勧めしておくよ。以下はおじさんが実装したものの結果だ。
よしよし、ちゃんとできているようだ。(ちなみにこれは数学的には1次元要素セットのラプラシアン演算子というものだよ、話の流れとはあまり関係ないけどね!)
さあ次に、SparseMatrixどうしの掛け算を行うメソッドを記述することを考えよう。行列の掛け算なので、AxB としたときに、Aの任意行の全要素と、Bの任意列の全要素とを、それぞれ列または行IDの小さい順に辿れるような状態にしたいよな。そのためには、AがCRS形式、BがCCS形式として表現されていると非常に便利そうだ。
さて、冒頭で考えたように非ゼロ要素セットをCRS形式、もしくはCCS形式として再編成する、あの考え方…はい、なのでそれは俺たちにとって見ればとても簡単なことで、それは以下のようになる。
void SparseMatrix::CompileCCS( void ){ std::sort( _elem_vec.begin(), _elem_vec.end(), CompTupleBy<0>() ); std::stable_sort( _elem_vec.begin(), _elem_vec.end(), CompTupleBy<1>() ); return; } void SparseMatrix::CompileCRS( void ){ std::sort( _elem_vec.begin(), _elem_vec.end(), CompTupleBy<1>() ); std::stable_sort( _elem_vec.begin(), _elem_vec.end(), CompTupleBy<0>() ); return; }
CompTupleByは、先日の日記を参照してほしい。
よし、ここまで揃ってしまえば、SparseMatrixどうしの掛け算を行うためのメソッドは、以下のように実装できるぞ。
void SparseMatrix::MultiMatrix( SparseMatrix& b ) { typedef vector< TupType >::const_iterator cIter; SparseMatrix a = *this; SparseMatrix c; c.setM( a.M() ); c.setN( b.N() ); a.CompileCRS();// 左のかけられる方はCRSでソート b.CompileCCS();// 右からかける方はCCSでソート // sigma for k( a[i][k] * b[k][j] ) k:[0, _m-1] const vector< TupType >& l = a.refElemVec(); const vector< TupType >& l2 = b.refElemVec(); // aの先頭から cIter it = l.begin(); while( it!=l.end() ){ // aの行を順に巡る cIter lb = it; cIter ub = upper_bound( it, l.end(), *it, CompTupleBy<0>() ); // bの先頭から cIter it2 = l2.begin(); while( it2 !=l2.end() ){ // bの列を順に巡る cIter ub2 = upper_bound( it2, l2.end(), *it2, CompTupleBy<1>() ); int i = get<0>(*it); int j = get<1>(*it2); float cij = 0; bool cij_changed =0; { // aのi行とbのj列の内積を、cijに求める。 for( ; it!=ub && it2!=ub2; ){ int k1 = get<1>(*it); int k2 = get<0>(*it2); if( k1 < k2 ){ ++it; }else if( k1 > k2 ){ ++it2; }else{ cij_changed = !0; cij += get<2>(*it) * get<2>(*it2); ++it; ++it2; } }// it } // cijがゼロでない可能性がある場合のみ // 出力マトリクスに要素{i, j, cij}を登録 if( cij_changed ) c.AddElem( i,j, cij ); it = lb; it2 = ub2; }// while it2 it = ub; }// while it *this = c; return; }
どうだろう、std::upper_bound()と、CompTupleBy が織りなすハーモニーがどれだけ便利かという感動をここでは噛みしめてほしい。
ではこれを使って実際に計算をしてみよう。使用側のコードは以下のようになるよ。
mat.MultiMatrix( mat ); mat.Dump();
そしてその結果は以下だ。
(ちなみにこれは数学的には1次元要素セットに対するラプラシアンのラプラシアン、即ちバイラプラシアン演算子というものだよ。話の流れ的にはあまり関係ないけどな)
これまで、n=11としてきたが、実にn=2560としても、おじさんのCore2DuoE8600マシンでリリースビルドしたものが、一瞬で帰ってくるぞ。一瞬というのはつまり、一秒もかからずに。
こんな土から起こした earth code でも正しいデータ構造と正しいアルゴリズムで実装すればこのパフォーマンスだ。いかがだろうか。
次回はこれを使っておじさんが何を計算したいのか、一体あんたは何をしたいのか何でこんなもの考えてんだという点にクローズアップしていくぞ。