MogileFSを読んでみる

brad productsはMemcachedに代表されるように非常に設計が素朴で、それでいて高性能ということで興味がわいたのでmogilefsdを中心にMogileFSソースコードを読んでみました。

まず、http://code.sixapart.com/svn/mogilefs/から最新版のコードをチェックアウトします。この中のdocや、CHANGLOG、devnotesは貴重な情報源なので目を通しておいた方が良いでしょう。

MogileFSはデータを入れた際に、そのデータのクラスのmin_devcountの数は最低でも複製されるようにします。その際にどのようにして、デバイスを選択しているかなどを中心に読んでみました。

まずmogilefsdを立ち上げると、それぞれ役割別のworkerが子プロセスとして立ち上がります。
どんなWorkerがあるかは、MogileFS::Worker以下をのモジュールを見ればわかります。

また、ps aux | grep mogなどとすると、プロセス名の右側にどの役割をしているかが表示されるので、どのプロセスがどのワーカーなのか確認することができます。各ワーカーのプロセス数は設定ファイルの~_jobsといった項目でそれぞれ設定することができます。

MogileFSには、Query, Delete, Replicate, Reaper, Monitor, FsckなどのWorkerがいます。
Queryには主にクライアントからネットワークを通して叩くコマンドが定義されています。
それぞれのコマンドはcmd_から始まるメソッドとして定義されています。

mogadmのコマンドや、MogileFS::Clientは実質ここで実装されているコマンドにリクエストを投げているのでクライアントはどの言語でも記述することができます。(RailsMogileFSを組み合わせてみたという話は日本ではそれほど聞かないですが、rubyのクライアントもSeatle.rbが作ってるみたいです。)

MogileFSの処理のコードを追う際は、まずここから追うのが良いでしょう。

Workerの構成はまずnewメソッドがあってここで初期化が行われています。
それから、workメソッドがあってここは全体がどのように走るかということが記述されています。workメソッドのループの中でprocess_lineメソッドが実行されていて、ループの中のメインの処理はprocess_lineに記述されています。

# MogileFS::Worker::Query
1257 sub cmd_replicate_now {
1258     my MogileFS::Worker::Query $self = shift;
1259 
1260     my $rv = Mgd::get_store()->replicate_now;
1261     return $self->ok_line({ count => int($rv) });
1262 }

で、replicationを始めるコマンドが定義されてます。

ここで出てくるMgdというのは、MogileFS::Server内で定義されています。

# MogileFS::Server
236 my ($store, $store_pid);
237 sub get_store {
238     return $store if $store && $store_pid == $$;
239     $store_pid = $$;
240     return $store = MogileFS::Store->new;
241 }

ここで出てくる$$はpidという名前と比較してるように実行中のプロセスIDです。処理はMogileFS::Storeに移ります。

1105 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a      given number of seconds.
1106 sub enqueue_for_replication {
1107     my ($self, $fidid, $from_devid, $in) = @_;
1108 
1109     my $nexttry = 0;
1110     if ($in) {
1111         $nexttry = $self->unix_timestamp . " + " . int($in);
1112     }
1113 
1114     $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1115                          "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1116 }
1117 
1118 # reschedule all deferred replication, return number rescheduled
1119 sub replicate_now {1120     my ($self) = @_;
1121     return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix     _timestamp .
1122                           " WHERE nexttry > " . $self->unix_timestamp);
1123 }

ここでは、file_to_replicateというDBのテーブルの中にあるファイルから現在の時刻よりも遅いものを現在行うようにデータを書き換えています。

ここで処理を見失ってしまいました。WorkerのReplicateというのがそれらしい名前をしているのでここを読むことにします。

まず大まかな流れをつかむためにworkメソッドを読みます。

# MogileFS::Worker::Replicate
 79     every(2.0, sub {
 80         $self->parent_ping;
 81         ...
 82         ...

まずeveryというのにworkメソッドのほぼすべてが囲まれています。everyはMogileFS::Utilの中で定義されていて、2.0秒に一回この中の処理が繰り返されます。

110         my $idle = 1;
111 
112         # this finds stuff to replicate based on its record in the needs_replicati    on table
113         $idle = 0 if $self->replicate_using_torepl_table;
114 
115         # this finds stuff to replicate based on the devcounts.  (old style)
116         if (MogileFS::Config->config("old_repl_compat")) {
117             $idle = 0 if $self->replicate_using_devcounts;
118         }
119 
120         # if replicators are otherwise idle, use them to make the world
121         # better, rebalancing things (if enabled), and draining devices (if
122         # any are marked drain)
123         if ($idle) {
124             $self->rebalance_devices;
125             $self->drain_devices;
126         }

ここでは、2つの方法でリプリケーションをさせようと試み、もしそのどちらもジョブがない状態であったときは、デバイスにおいてあるファイルの再配置を試みようとします。(データを各デバイスになるべく均等に割り振るために)

まずはreplicate_using_torepl_tableの方から読んでいきます。
まず、DBから複製候補の一覧を最大1000件読み込み、ランダムな順番にかき混ぜます。

2つ以上複製されていれば、飛ばして複製をします。

replicateの処理は484行目から始まっています。

実際の処理は、583行目あたりから始まってます。

582     my $rr;  # MogileFS::ReplicationRequest
583     while (1) {
584         $rr = rr_upgrade($polobj->replicate_to(
585                                                fid       => $fidid,
586                                                on_devs   => \@on_devs_tellpol, # a
    ll device objects fid is on, dead or otherwise
587                                                all_devs  => $devs,
588                                                failed    => \%dest_failed,
589                                                min       => $cls->mindevcount,
590                                                ));

$polobjというのはMogileFS::ReplicationPolicyのサブクラスのインスタンスで、このクラスの中でどのようなときに、どのホスト・デバイスに処理を割り振った方が良いかが書かれています。

651         my $worker = MogileFS::ProcManager->is_child or die;
652         my $rv = http_copy(
653                            sdevid       => $sdevid,
654                            ddevid       => $ddevid,
655                            fid          => $fidid,
656                            expected_len => undef,  # FIXME: get this info to pass     along
657                            errref       => \$copy_err,
658                            callback     => sub { $worker->still_alive; },
659                            );
660         die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_e    rror$/;

の部分でmogstoredにリクエストを投げ、実際のファイルのコピーが行われているようです。

ようやくここまでで非常におおざっぱですが、入り口にたどり着いた感じです。
MogiileFSのコードを読んだときは、Rebalancingの仕組みを追うのが目的だったので、
次回は少し戻って再配置させる仕組みについて追っていこうかと思います。