lock-free stackと並行アルゴリズムの区分

この記事は
カーネルVM Advent Calendar http://atnd.org/events/10701
のために書かれました。

これまで複数回に渡ってlock-freeデータ構造を紹介して来ましたが
そもそもの前提を話していなかったり目的も不明だったりと不備だった点があったので
根元から一度おさらいしてみたいと思います。


まずロックを用いる事の欠点から

上図のような構図でロックによる相互排他を行うと様々な問題が発生します。
具体的に言うと排他に成功したスレッドに様々な災難が降りかかります。

主な事例として

↑ロック確保できたのにOSによってプリエンプションされる。


↑物理メモリに乗ってない仮想メモリにアクセスしてしまった。


↑キャッシュミスヒットによるメモリ待ち。

そんなに気にするほどのパフォーマンス低下ではないと思うかも知れませんが
マルチコアの方向へ舵を切った新世代CPUを使いこなすに当たっては重大なスループット低下を招きます。



ロック内のスレッドが無駄にした1クロックは

ロック外のすべてのスレッドにとっての1クロックでもあるのです。



至高のパフォーマンスを目指す近代のアルゴリズマー達はこの問題を決して野放しにはしませんでした。

その対策の一部がこれから紹介するnon-blockingデータ構造の数々です。

google mockで快適テスト生活

googleにより公開されているgoogle mockが使いやすかったので使い方紹介。

http://code.google.com/p/googlemock/
から落としてペペっと入れましょう。

使いたいシーンとしては、充分モジュール化されたオブジェクト指向プログラム中で

int main(){
  A a;
  B b;
  func(&a, &b);  // a->hoge(1)  と  b->fuga(2,3) が呼ばれて欲しい!
}

この様に特定の関数が狙った通りの動作をして欲しい(というか、その動作をすることを仕様化したい場合)

#include <gtest/gtest.h>
#include <gmock/gmock.h>
class mock_A{
public:
  MOCK_METHOD1(hoge,void(int));  // void hoge(int);と同義
};
class mock_B{
public:
  MOCK_METHOD2(fuga,void(int,int));  // void fuga(int,int);と同義
};

と書いて予め呼び出すシグネチャを定義しておき、gtestのテスト本体で

TEST(func, will_call_hoge_and_fuga){
  mock_A a;
  mock_B b;

  // 呼ばれるべき関数を規定
  EXPECT_CALL(a, hoge(1));
  EXPECT_CALL(b, fuga(1,2));

  // 実際に関数を呼んでみる
  func(&a, &b); // a->hoge(1), b->fuga(1,2)の両方が呼ばれたらテスト成功
}

のように書く事でテストを作れます。
ノリとしてはジョジョ2部の「次にお前は○○と言う!!」みたいな感じですね。


関数がそもそもA,Bに対応しているのに対し、mock_Aとmock_Bを引数に取らせてテストをすることになるため
funcはtemplateで記述する必要があります。

template <typename A, typename B>
void func(A* a, B* b){
   ...
}

templateだとコンパイル時間が膨れ上がるようになるので継承でどうにかすることもできます。

// Aもmock_Aもこれらのクラスを継承する。
struct base_A{
  virtual void hoge(int) = 0;
  ......
};
struct base_B{
  virtual void fuga(int,int) = 0;
  ......
};

// baseをターゲットにする
void func(base_A* a, base_B* b);

動的にディスパッチされるのでコンパイル時の負担が減る…はずです。


実際にそれなりに便利に使ってるコードは
http://github.com/kumagi/skipgraph/blob/master/logic_test.cc
で公開しています。
skip graphの各ノードの挙動を規定してその動作の通りになっている事をチェックさせています。
まったく同一のコードを実際のノード上で別のtemplate引数で展開するので安心です。

以下、細かいTips

続きを読む

for_eachで平均・分散を求める

配列内のデータの平均・分散を算出するには通常以下のようなプログラムを書く

double avg = 0.0;
for(size_t i=0; i<data.size(); i++){ avg += data[i];} // 合計を計算
avg /= data.size(); // 総数で割って平均値
double var = 0.0;
for(size_t i=0; i<data.size(); i++)
   { var += (data[i] - avg)*(data[i] - avg);} // 平均との差分を2乗
var /= data.size(); // 総数で割って分散値

これで読みやすい人はそれで良いけども、何も知らない人にはちょっと読みにくい。
そこで以下のような書き方を推奨。
まず

class calc_var{
  const double avg;
  double sum;
public:
  calc_var(const double& a):avg(a),sum(0.0){}
  inline void operator()(const double& dat){
    double diff = dat-avg;
    sum+=diff*diff;
  }
  const double& get()const{ return sum;}
};

このように平均値をセットして差分の二乗を合計する関数オブジェクトを用意しておいて

#include <numeric>
#include <algorithm>
// 平均値はnumeric内のaccumulateを使う
const double avg = std::accumulate(data.begin(),data.end(),0.0) / data.size();

// 分散値はaccumulateでは出来ないのでfor_eachを使う
const double var = std::for_each(data.begin(), data.end(), calc_var(avg)).get() / data.size();

for_eachで関数オブジェクトを平均値付きで渡す。
for_eachは「範囲内の要素全てに関数を適用する」という意味では直感的だけど、そこから戻り値を獲得する為にはひと工夫必要。

for_eachの戻り値は「範囲内の要素全てを適用したあとの関数オブジェクト」なので、その戻り値自体をクラスとしてメンバ関数を呼び出せる。
ここではfor_eachで合計値を算出させ、合計結果を獲得するメンバ関数getを用意している。
述語として渡す関数はステートレスな物が推奨されているけれど、この場合は例外?

(ネタがだんだんtrivialになっていく

Split-ordered linked list: lock free hash table

ロックを使わず共有できるハッシュテーブル
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.100.7132&rep=rep1&type=pdf
の論文のスライド資料を作りました。

Hashmapは大きく分けてオープンアドレッシングとクローズドアドレッシングの2種類に分かれ、こちらは後者です。
Linear-probingなどを行う物と異なり、ハッシュ値が必ず1つのバケットのみと対応するからでしょうか?

Lock-free linked-listを利用することでバケット内部での競合を解決すると共にバケットの拡張でお互いを邪魔することもありません。

trivialだけど同症状で困ってる数少ない方向け
C++の有名ライブラリboostには静的リンクするものと動的リンクするものがあります。

program_optionsはプログラム起動時に与えられた引数のparseを一手に引き受けて、ヘルプ作成までやってくれる優れものですが、動的リンクするタイプのライブラリです。概要や使いかたはlet's boostを参照。
http://www.kmonos.net/alang/boost/classes/program_options.html
動的リンクなのでコンパイルには

g++ hoge.o fuga.o -lboost_program_options

のようにオプションを付ける必要があります。

古いboostからバージョンアップした際に、/usr/lib/や/usr/lib64/内部にあるDLLをアップデートし損ねているとヘッダファイルだけバージョンアップされて、バイナリが古いままなのでコンパイルが通りません。
そしてこのようなエラーメッセージを拝むことになります。
(このメッセージはboost-1.33→1.43にアップデートした場合に出た物)

undefined reference to `boost::program_options::options_description::m_default_line_length'
`boost::program_options::basic_command_line_parser::extra_parser(boost::function1, std::allocator >, std::basic_string, std::allocator > >, std::basic_string, std::allocator > const&>)':
...

なので/usr/lib内や/usr/lib64内部を手動でアップデートさせます(自動で行う方法もあると思いますが
動的リンクさせるべきDLLファイルはboostの入っていたディレクトリからstage/lib/に一式入っています

# rm /usr/lib/libboost_program_options.so
# rm /usr/lib64/libboost_program_options.so
# cp /(boostのディレクトリ)/stage/lib/libboost_program_options.* /usr/lib/
# cp /(boostのディレクトリ)/stage/lib/libboost_program_options.* /usr/lib64/

本来は普通にインストールした時点で動的リンクも全て最新の物になっているべきですが、何らかの理由でアップデートされなかった場合など。

KVS上に実装するStack

CASがあればCAS依存のデータ構造が実装出来る第二弾です。
今度はLockfreeの中でも簡単なデータ構造なスタックです。
前回の疑似コードは微妙だったのでもうちょっとpythonらしい文法に直しました。

import memcache // python-memcached使用
import string
from random import randrange

alphabets = string.digits + string.letters

// ランダム文字列を作る関数
def random_string(n):
    return ''.join(random.choice(alphabets) for i in xrange(n))

// KVS上に実装するスタック
class kvs_stack(object):
	def __init__(self,_server):
		self.client = memcache.Client(_server, debug=0) // memcachedサーバに接続
		head = random_string(8)  // この値をクライアント同士で共有する
		self.client.add(head,'')  // 初めのスタックは空

	def push(self, value_):
		newvalue = value_.join(random_string(8)) // ランダム文字列を末尾に付けて一意性を確保
		self.client.add(newvalue,'')
		while(1):
			old_head = self.client.gets(self.head)  // headが指す先がstackの先頭
			self.client.replace(newvalue,old_head)  // 新規アイテムがそれを指すように書き換え
			
			result = self.client.cas(self.head, newvalue)  // 新規アイテムを指すようにhead差し替え
			if(result == 0): // 失敗したらnewvalueの指す先を書き換える所からやり直し
				continue
			else:  // 成功
				break 

	def pop(self, value_):
		while(1):
			old_head = self.client.gets(self.head) 
			if(old_head == ''):  // スタックが空の場合なら空文字を返す
				return ''
			next_head = self.client.get(old_head)  // headが次に指すべき値を探す
			
			result = self.client.cas(self.head, next_head)  // headを差し替える
			if(result == 0):
				continue  // 失敗したならやり直し
			else:
				value_ = old_head[0:-8]  // 後ろに取り付けた乱数列を削って
				self.client.delete(old_head)  // KVS上から該当項目を消して
				break  // 成功

前回のFIFOでも言えることなんですが
・addは万一ダブって失敗した場合乱数の生成からやり直し
・保存する対象はmemcahedプロトコルに干渉する内容の物は不可
です。恐らくどちらも改善可能な部類ですが今回は説明のため簡単にしました。

次は…双方向dequeでしょうか…?