C++によるMySQLの高速アクセス予備実験
CentOS+MySQL+Mysql++で,非常に激しいアクセスがあった場合のパフォーマンス向上を考えてみました.
作成したベンチマークプログラムは次のような感じ.
まず,基本データをデータベース内に作成するルーチン.
int database_reset(Connection &db_conn) { if(!db_conn.ping()) Abort("cannot connect for data preparation"); Query query = db_conn.query(); query<<"DROP TABLE bench"; query.execute(); query<<"CREATE TABLE bench (mykey INT NOT NULL, value VARCHAR(255) NOT NULL, PRIMARY KEY (mykey)) ENGINE = MYISAM;"; query.execute(); return(0); } int generate_data_table(vector<string> &table) { for(int key=0;key<MAX_ROW;++key) { string val="val:"; for(int j=0;j<rand()%31+1;++j) { val+= (char)(48+rand()%10); } table.at(key)=val; } return(0); } int insert_data_table(Connection &db_conn, vector<string> &table) { ostringstream ost; Query query = db_conn.query(); for(int key=0;key<MAX_ROW;++key) { query<<"INSERT INTO bench (mykey, value) VALUES ("<<key<<",'"<<table.at(key)<<"')"; ost<<"cannot insert test data("<<key<<","<<table.at(key)<<")"; if(!query.execute()) Abort(ost.str()); } return(0); }
こんな感じのデータが作成されます.
mysql> select * from bench; +-------+-----------------------------+ | mykey | value | +-------+-----------------------------+ | 0 | val:655217966 | | 1 | val:172 | | 2 | val:35287 | | 3 | val:19148 | | 4 | val:060 | | 5 | val:2 | | 6 | val:157 | | 7 | val:6342443 | | 8 | val:8841 | | 9 | val:2 | (以下略)
そしてこのデータの読み出し実験なのですが,まずは毎回,コネクションを張りに行くタイプ.
int read_single_use_reconnect(vector<string> &table) { time_t start = time(NULL); UseQueryResult results; int success = 0; for(int i=0;i<MAX_ROW;++i) { Connection db_conn("test","dbserver.example.com","root","",3306); if(!db_conn.ping()) Abort("cannot connect for benchmarking"); Query query = db_conn.query(); Row row; int key = rand()%MAX_ROW; query<<"SELECT SQL_NO_CACHE value FROM bench WHERE mykey="<<key; results = query.use(); if(results) { while(row = results.fetch_row()) { string value; row["value"].to_string(value); if(table.at(key)==value) ++success; } }else { cout<<"cannot fetch results for key["<<i<<"]"<<endl; cout<<"reason:"<<query.error()<<endl; } } time_t end = time(NULL); cout<<"success: "<<success<<" in "<<MAX_ROW<<" = "<<(double)success/MAX_ROW*100<<" % Ok."<<endl; cout<<"time = "<<(end-start)<<" sec."<<endl; return(0); }}
この実行時間は,こんな感じでした.
# ./a.out /***** Basic Database Construction *****/ time = 7 sec. Single Thread, Random Read, use(), Parallel Connections Reading Start... success: 50000 in 50000 = 100 % Ok. time = 47 sec. Reading End.
47秒かかっています.
次に,一度コネクションをはったら,そのコネクションオブジェクトをずっと使うタイプ.
int read_single_use(Connection &db_conn, vector<string> &table) { time_t start = time(NULL); UseQueryResult results; Query query = db_conn.query(); int success = 0; for(int i=0;i<MAX_ROW;++i) { if(!db_conn.ping()) Abort("cannot connect for benchmarking"); Row row; int key = rand()%MAX_ROW; query<<"SELECT SQL_NO_CACHE value FROM bench WHERE mykey="<<key; results = query.use(); if(results) { while(row = results.fetch_row()) { string value; row["value"].to_string(value); if(table.at(key)==value) ++success; } }else { cout<<"cannot fetch results for key["<<i<<"]"<<endl; cout<<"reason:"<<query.error()<<endl; } } time_t end = time(NULL); cout<<"success: "<<success<<" in "<<MAX_ROW<<" = "<<(double)success/MAX_ROW*100<<" % Ok."<<endl; cout<<"time = "<<(end-start)<<" sec."<<endl; return(0); }
結果は...
# ./a.out /***** Basic Database Construction *****/ time = 7 sec. Single Thread, Random Read, use(), Single Connection Reading Start... success: 50000 in 50000 = 100 % Ok. time = 15 sec. Reading End.
だいぶ速いですね.
さて,ここから無謀に,さきにコネクションを大量に張ってしまい,一気にクエリを送ると速くなるんじゃないかと考えました.
スレッド化して並列に送信すれば,爆速になるのではないかと.
そこで,コネクションオブジェクトをvectorクラスで大量に作成しておくバージョンを書いてみました.
int read_single_use_parallel_connect(vector<string> &table) { time_t start = time(NULL); vector<Connection> db_conn(MAX_ROW,Connection()); int j=0; for(vector<Connection >::iterator i=db_conn.begin();i!=db_conn.end();++i) { cout<<"connect ->"<<(j++)<<endl; i->connect("test","dbserver.example.com","root","",3306); if(!i->ping()) { cout<<i->error()<<endl; Abort("cannot connect for benchmarking"); } } UseQueryResult results; int success = 0; for(int i=0;i<MAX_ROW;++i) { if(!db_conn.at(i).ping()) { cout<<db_conn.at(i).error()<<endl; Abort("cannot connect for benchmarking"); } Query query = db_conn.at(i).query(); Row row; int key = rand()%MAX_ROW; query<<"SELECT SQL_NO_CACHE value FROM bench WHERE mykey="<<key; results = query.use(); if(results) { while(row = results.fetch_row()) { string value; row["value"].to_string(value); if(table.at(key)==value) ++success; } }else { cout<<"cannot fetch results for key["<<i<<"]"<<endl; cout<<"reason:"<<query.error()<<endl; } } time_t end = time(NULL); cout<<"success: "<<success<<" in "<<MAX_ROW<<" = "<<(double)success/MAX_ROW*100<<" % Ok."<<endl; cout<<"time = "<<(end-start)<<" sec."<<endl; return(0); }
...しかし,このプログラムは動きません.
# ./a.out /***** Basic Database Construction *****/ time = 7 sec. Single Thread, Random Read, use(), Parallel Connections Reading Start... connect ->0 セグメンテーション違反です
vectorで複数のオブジェクトを作ると,最初のオブジェクトをコピーコンストラクタでコピーして作られるので,結局同じオブジェクトが大量にできてしまうからです.
それを避けるには,独立したオブジェクトが作られるようにすればいいです.
time_t start = time(NULL); vector<Connection *> db_conn(MAX_ROW); // オブジェクト本体は作らず,ポインタを準備する int j=0; for(vector<Connection *>::iterator i=db_conn.begin();i!=db_conn.end();++i) { cout<<"connect ->"<<(j++)<<endl; *i = new Connection(false); // オブジェクトを作成 (*i)->connect("test","db7","root","",3306); if(!(*i)->ping()) { cout<<(*i)->error()<<endl; Abort("cannot connect for benchmarking"); } } UseQueryResult results; int success = 0; for(int i=0;i<MAX_ROW;++i) { if(!db_conn.at(i)->ping()) { cout<<db_conn.at(i)->error()<<endl; Abort("cannot connect for benchmarking"); } Query query = db_conn.at(i)->query();
すると,
# ./a.out /***** Basic Database Construction *****/ time = 8 sec. Single Thread, Random Read, use(), Parallel Connections Reading Start... connect ->0 connect ->1 connect ->2 connect ->3 connect ->4 (中略) connect ->1016 connect ->1017 connect ->1018 connect ->1019 Can't ping database server while disconnected ERROR:cannot connect for benchmarking
と,1000ちょいのコネクションが貼れました.
さすがに無茶すぎる気がするので,10本だけ張って使いまわしすることにしてみます.
time_t start = time(NULL); // 10本にする #define SMALL_MAX_ROW 10 vector<Connection *> db_conn(SMALL_MAX_ROW); int j=0; for(vector<Connection *>::iterator i=db_conn.begin();i!=db_conn.end();++i) { cout<<"connect ->"<<(j++)<<endl; *i = new Connection(false); (*i)->connect("test","dbserver.example.com","root","",3306); if(!(*i)->ping()) { cout<<(*i)->error()<<endl; Abort("cannot connect for benchmarking"); } } UseQueryResult results; int success = 0; for(int i=0;i<MAX_ROW;++i) { if(!db_conn.at(i%SMALL_MAX_ROW)->ping()) // 使いまわし { cout<<db_conn.at(i%SMALL_MAX_ROW)->error()<<endl; // 使いまわし Abort("cannot connect for benchmarking"); } Query query = db_conn.at(i%SMALL_MAX_ROW)->query(); // 使いまわし
結果は,
# ./a.out /***** Basic Database Construction *****/ time = 7 sec. Single Thread, Random Read, use(), Parallel Connections Reading Start... connect ->0 connect ->1 connect ->2 connect ->3 connect ->4 connect ->5 connect ->6 connect ->7 connect ->8 connect ->9 success: 50000 in 50000 = 100 % Ok. time = 15 sec. Reading End. [root@bhs1 optserver]#
速度は変わらないですね.
じゃあ,並列処理を導入してみましょう.
UseQueryResult results; int success = 0; // OpenMPを使う #pragma omp parallel for shared(db_conn) private(results) reduction(+:success) for(int i=0;i<MAX_ROW;++i) { if(!db_conn.at(omp_get_thread_num())->ping()) // スレッドごとに,別なコネクションを使う { cout<<db_conn.at(omp_get_thread_num())->error()<<endl; Abort("cannot connect for benchmarking"); } Query query = db_conn.at(omp_get_thread_num())->query();
結果は...
# export OMP_NUM_THREADS=10; ./a.out /***** Basic Database Construction *****/ time = 8 sec. Multi Threads, Random Read, use(), Parallel Connections Reading Start... connect ->0 connect ->1 connect ->2 connect ->3 connect ->4 connect ->5 connect ->6 connect ->7 connect ->8 connect ->9 success: 50000 in 50000 = 100 % Ok. time = 3 sec. Reading End.
はやっ!!!!!!!
結果をまとめますと,
アクセス方法 | アクセス時間 |
マルチコネクション+OpenMPマルチスレッド並列実行 | 約3秒 |
マルチコネクション+シングルスレッドシーケンシャル実行 | 約15秒 |
毎回シングルコネクション+シングルスレッドシーケンシャル実行 | 約46秒 |
シングルコネクション使い続け+シングルスレッドシーケンシャル実行 | 約14秒 |
とにかく,毎回コネクションを張るのって,とっても時間がかかるんですね.
そしてOpenMP,ほんの数行の書き換えでスレッド処理ができるのって,やっぱり素晴らしいです.
これがpthreadだったら,スレッド作成やらデタッチやらスレッドIDの認識やら,いろいろ大変です.