C++によるMySQLの高速アクセス予備実験

CentOS+MySQL+Mysql++で,非常に激しいアクセスがあった場合のパフォーマンス向上を考えてみました.
作成したベンチマークプログラムは次のような感じ.
まず,基本データをデータベース内に作成するルーチン.

int database_reset(Connection &db_conn)
{
  if(!db_conn.ping()) Abort("cannot connect for data preparation");
  Query query = db_conn.query();
  query<<"DROP TABLE bench";
  query.execute();
  query<<"CREATE TABLE bench (mykey INT NOT NULL, value VARCHAR(255) NOT NULL, PRIMARY KEY (mykey)) ENGINE = MYISAM;";
  query.execute();
  return(0);
}
int generate_data_table(vector<string> &table)
{
  for(int key=0;key<MAX_ROW;++key)
  {
    string val="val:";
    for(int j=0;j<rand()%31+1;++j)
    {
       val+= (char)(48+rand()%10);
    }
    table.at(key)=val;
  }
  return(0);
}
int insert_data_table(Connection &db_conn, vector<string> &table)
{
  ostringstream ost;
  Query query = db_conn.query();
  for(int key=0;key<MAX_ROW;++key)
  {
    query<<"INSERT INTO bench (mykey, value) VALUES ("<<key<<",'"<<table.at(key)<<"')";
    ost<<"cannot insert test data("<<key<<","<<table.at(key)<<")";
    if(!query.execute()) Abort(ost.str());
  }
  return(0);
}

こんな感じのデータが作成されます.

mysql> select * from bench;
+-------+-----------------------------+
| mykey | value                       | 
+-------+-----------------------------+ 
|     0 | val:655217966               |
|     1 | val:172                     |
|     2 | val:35287                   |
|     3 | val:19148                   |
|     4 | val:060                     |
|     5 | val:2                       |
|     6 | val:157                     |
|     7 | val:6342443                 |
|     8 | val:8841                    |
|     9 | val:2                       |  
(以下略)

そしてこのデータの読み出し実験なのですが,まずは毎回,コネクションを張りに行くタイプ.

int read_single_use_reconnect(vector<string> &table)
{
  time_t start = time(NULL);
  UseQueryResult results;
  int success = 0;
  for(int i=0;i<MAX_ROW;++i)
  {
    Connection db_conn("test","dbserver.example.com","root","",3306);
    if(!db_conn.ping()) Abort("cannot connect for benchmarking");
    Query query = db_conn.query();
    Row row;
    int key = rand()%MAX_ROW;
    query<<"SELECT SQL_NO_CACHE value FROM bench WHERE mykey="<<key;
    results = query.use();
    if(results)
    {
      while(row = results.fetch_row())
      {
        string value;
        row["value"].to_string(value);
        if(table.at(key)==value)  ++success;
      }
    }else
    {
      cout<<"cannot fetch results for key["<<i<<"]"<<endl;
      cout<<"reason:"<<query.error()<<endl;
    }
  }
  time_t end = time(NULL);
  cout<<"success: "<<success<<" in "<<MAX_ROW<<" = "<<(double)success/MAX_ROW*100<<" % Ok."<<endl;
  cout<<"time = "<<(end-start)<<" sec."<<endl;
  return(0);
}}

この実行時間は,こんな感じでした.

# ./a.out
/***** Basic Database Construction *****/
time = 7 sec.
Single Thread, Random Read, use(), Parallel Connections
Reading Start...
success: 50000 in 50000 = 100 % Ok.
time = 47 sec.
Reading End.

47秒かかっています.

次に,一度コネクションをはったら,そのコネクションオブジェクトをずっと使うタイプ.

int read_single_use(Connection &db_conn, vector<string> &table)
{
  time_t start = time(NULL);
  UseQueryResult results;
  Query query = db_conn.query();
  int success = 0;
  for(int i=0;i<MAX_ROW;++i)
  {
    if(!db_conn.ping()) Abort("cannot connect for benchmarking");
    Row row;
    int key = rand()%MAX_ROW;
    query<<"SELECT SQL_NO_CACHE value FROM bench WHERE mykey="<<key;
    results = query.use();
    if(results)
    {
      while(row = results.fetch_row())
      {
        string value;
        row["value"].to_string(value);
        if(table.at(key)==value)  ++success;
      }
    }else
    {
      cout<<"cannot fetch results for key["<<i<<"]"<<endl;
      cout<<"reason:"<<query.error()<<endl;
    }
  }
  time_t end = time(NULL);
  cout<<"success: "<<success<<" in "<<MAX_ROW<<" = "<<(double)success/MAX_ROW*100<<" % Ok."<<endl;
  cout<<"time = "<<(end-start)<<" sec."<<endl;
  return(0);
}

結果は...

# ./a.out
/***** Basic Database Construction *****/
time = 7 sec.
Single Thread, Random Read, use(), Single Connection
Reading Start...
success: 50000 in 50000 = 100 % Ok.
time = 15 sec.
Reading End.

だいぶ速いですね.

さて,ここから無謀に,さきにコネクションを大量に張ってしまい,一気にクエリを送ると速くなるんじゃないかと考えました.
スレッド化して並列に送信すれば,爆速になるのではないかと.
そこで,コネクションオブジェクトをvectorクラスで大量に作成しておくバージョンを書いてみました.

int read_single_use_parallel_connect(vector<string> &table)
{

  time_t start = time(NULL);
  vector<Connection> db_conn(MAX_ROW,Connection());
  int j=0;
  for(vector<Connection >::iterator i=db_conn.begin();i!=db_conn.end();++i)
  {
    cout<<"connect ->"<<(j++)<<endl;
    i->connect("test","dbserver.example.com","root","",3306);
    if(!i->ping())
    {
      cout<<i->error()<<endl;
      Abort("cannot connect for benchmarking");
    }
  }  UseQueryResult results;
  int success = 0;
  for(int i=0;i<MAX_ROW;++i)
  {
    if(!db_conn.at(i).ping())
    {
      cout<<db_conn.at(i).error()<<endl;
      Abort("cannot connect for benchmarking");
    }
    Query query = db_conn.at(i).query();
    Row row;
    int key = rand()%MAX_ROW;
    query<<"SELECT SQL_NO_CACHE value FROM bench WHERE mykey="<<key;
    results = query.use();
    if(results)
    {
      while(row = results.fetch_row())
      {
        string value;
        row["value"].to_string(value);
        if(table.at(key)==value)  ++success;
      }
    }else
    {
      cout<<"cannot fetch results for key["<<i<<"]"<<endl;
      cout<<"reason:"<<query.error()<<endl;
    }
  }
  time_t end = time(NULL);
  cout<<"success: "<<success<<" in "<<MAX_ROW<<" = "<<(double)success/MAX_ROW*100<<" % Ok."<<endl;
  cout<<"time = "<<(end-start)<<" sec."<<endl;
  return(0);
}

...しかし,このプログラムは動きません.

# ./a.out
/***** Basic Database Construction *****/
time = 7 sec.
Single Thread, Random Read, use(), Parallel Connections
Reading Start...
connect ->0
セグメンテーション違反です

vectorで複数のオブジェクトを作ると,最初のオブジェクトをコピーコンストラクタでコピーして作られるので,結局同じオブジェクトが大量にできてしまうからです.
それを避けるには,独立したオブジェクトが作られるようにすればいいです.

  time_t start = time(NULL);
  vector<Connection *> db_conn(MAX_ROW);  // オブジェクト本体は作らず,ポインタを準備する
  int j=0;
  for(vector<Connection *>::iterator i=db_conn.begin();i!=db_conn.end();++i)
  {
    cout<<"connect ->"<<(j++)<<endl;
    *i = new Connection(false);  // オブジェクトを作成
    (*i)->connect("test","db7","root","",3306);
    if(!(*i)->ping())
    {
      cout<<(*i)->error()<<endl;
      Abort("cannot connect for benchmarking");
    }
  }
  UseQueryResult results;
  int success = 0;
  for(int i=0;i<MAX_ROW;++i)
  {
    if(!db_conn.at(i)->ping())
    {
      cout<<db_conn.at(i)->error()<<endl;
      Abort("cannot connect for benchmarking");
    }
    Query query = db_conn.at(i)->query();

すると,

# ./a.out
/***** Basic Database Construction *****/
time = 8 sec.
Single Thread, Random Read, use(), Parallel Connections
Reading Start...
connect ->0
connect ->1
connect ->2
connect ->3
connect ->4
(中略)
connect ->1016
connect ->1017
connect ->1018
connect ->1019
Can't ping database server while disconnected
ERROR:cannot connect for benchmarking

と,1000ちょいのコネクションが貼れました.
さすがに無茶すぎる気がするので,10本だけ張って使いまわしすることにしてみます.

  time_t start = time(NULL);
// 10本にする
#define SMALL_MAX_ROW  10
  vector<Connection *> db_conn(SMALL_MAX_ROW);
  int j=0;
  for(vector<Connection *>::iterator i=db_conn.begin();i!=db_conn.end();++i)
  {
    cout<<"connect ->"<<(j++)<<endl;
    *i = new Connection(false);
    (*i)->connect("test","dbserver.example.com","root","",3306);
    if(!(*i)->ping())
    {
      cout<<(*i)->error()<<endl;
      Abort("cannot connect for benchmarking");
    }
  }
  UseQueryResult results;
  int success = 0;
  for(int i=0;i<MAX_ROW;++i)
  {
    if(!db_conn.at(i%SMALL_MAX_ROW)->ping()) // 使いまわし
    {
      cout<<db_conn.at(i%SMALL_MAX_ROW)->error()<<endl; // 使いまわし
      Abort("cannot connect for benchmarking");
    }
    Query query = db_conn.at(i%SMALL_MAX_ROW)->query(); // 使いまわし

結果は,

# ./a.out
/***** Basic Database Construction *****/
time = 7 sec.
Single Thread, Random Read, use(), Parallel Connections
Reading Start...
connect ->0
connect ->1
connect ->2
connect ->3
connect ->4
connect ->5
connect ->6
connect ->7
connect ->8
connect ->9
success: 50000 in 50000 = 100 % Ok.
time = 15 sec.
Reading End.
[root@bhs1 optserver]#

速度は変わらないですね.

じゃあ,並列処理を導入してみましょう.

  UseQueryResult results;
  int success = 0;
// OpenMPを使う
#pragma omp parallel for shared(db_conn) private(results) reduction(+:success)
  for(int i=0;i<MAX_ROW;++i)
  {
    if(!db_conn.at(omp_get_thread_num())->ping()) // スレッドごとに,別なコネクションを使う
    {
      cout<<db_conn.at(omp_get_thread_num())->error()<<endl;
      Abort("cannot connect for benchmarking");
    }
    Query query = db_conn.at(omp_get_thread_num())->query();

結果は...

# export OMP_NUM_THREADS=10; ./a.out
/***** Basic Database Construction *****/
time = 8 sec.
Multi Threads, Random Read, use(), Parallel Connections
Reading Start...
connect ->0
connect ->1
connect ->2
connect ->3
connect ->4
connect ->5
connect ->6
connect ->7
connect ->8
connect ->9
success: 50000 in 50000 = 100 % Ok.
time = 3 sec.
Reading End.

はやっ!!!!!!!

結果をまとめますと,

アクセス方法アクセス時間
マルチコネクション+OpenMPマルチスレッド並列実行約3秒
マルチコネクション+シングルスレッドシーケンシャル実行約15秒
毎回シングルコネクション+シングルスレッドシーケンシャル実行約46秒
シングルコネクション使い続け+シングルスレッドシーケンシャル実行約14秒
となりました.
とにかく,毎回コネクションを張るのって,とっても時間がかかるんですね.
そしてOpenMP,ほんの数行の書き換えでスレッド処理ができるのって,やっぱり素晴らしいです.
これがpthreadだったら,スレッド作成やらデタッチやらスレッドIDの認識やら,いろいろ大変です.