見出し画像

ひと工夫で高速になるテク

はじめに

みなさんこんにちは。ALH開発事業部のREIYAです。
今回はちょっとの工夫で処理を高速化できるテクを紹介します。


概要

業務で性能改善が必要な場合どうやって高速化・省メモするかを考えると思いますが
多くの場合クエリ改善であったりDBに関する何かしらのチューニングを行うことになると思います。
本記事ではそういった改善の背景に何があるのかを解釈し、高速化するためのひと工夫をみていくことにします。
DBの仕組みを解説するわけではありません。
※要所の解説を割愛しています。あくまで紹介目的なのであしからず

要約

  • 準備をしておく

  • データ構造を活用する

  • 状況に見合った手法を用いる

準備をしよう

まずは適切に準備をすることが大切な例を紹介します。次のような状況を考えてみましょう。


あなたは本国のとある大企業に勤めています。ある日上司からシステム開発を一任されてしまいました!
「国民全員の貯金データがある。◯円〜◯円までの人は何人いるのかを検索できるシステムを作りなさい。」
「ただし多くの人が使うためなるべく高速にしてほしい。」
「性能要件は100万件で100万回の検索を2秒以内とする」


本記事では

  • なんで貯金データがあるんだ

  • 個人情報漏れすぎててやばい

  • ただのSQL書いて終わらない?

  • 同接と複数回処理は別動作なので性能要件が不実

などの鋭いツッコミを受け付けております。
さて、より本質的に以下のように問題を言い換えます。
貯金額は10億円が最大としましょう。うらやましいですね。

$${N}$$個の数$${A}$$にて$${10^6}$$回、$${a_i}$$以上$${b_i}$$以下な要素の個数を出す
$${1 \le N \le 10^{6}}$$
$${1 \le A_i,a_i,b _i \le 10^{9}}$$ $${A_i,a_i,b_i \in \mathbb{Z}}$$

みなさんは以下のようなクエリを思いつくでしょうか。

SELECT COUNT(v) FROM A WHERE v BETWEEN {a} AND {b};

本記事ではこういった処理を言語側で行うことを考えてみます。

まずは愚直に

N = int(input())
A = [int(x) for x in input().split()]
Q = int(input()) # 1e6で確認したくないので受け取るようにしておきます
for _ in range(Q):
  a, b = map(int, input().split())
  print(sum([a <= v and v <= b for v in A]))

普通に調べるならこういったコードになるでしょう。
例えば以下のようなデータを与えてみると

10
4 2 3 2 3 1 3 3 4 1
2
2 2
2 3

2, 6と改行区切りで出力が得られます。
2円以上2円以下の人は2人、2円以上3円以下の人は6人います。
しかしこのままだとQ周中でN周するので最大$${10^{12}}$$ループすることになり、多くの場合2秒では処理が終わらないでしょう。
これでは上司に怒られてしまいます。こわい。

誤解しないで欲しいのが、必ずしもこの処理が悪いわけではありません
データ数が1000件程度であれば、むしろこれで良いのです。
100万データもあることが問題なのです。社会が悪い。

適切な準備1

怒られないために適切な準備をしておきましょう。
往々にしてこのような更新されないデータ$${A}$$にはなんらかの準備を施しておくことで、$${Q}$$回のクエリを高速化できる場合があります。
今回は$${A}$$をあらかじめソートしておくことで二分探索が使えます。二分探索の説明は省略します。
ところで$${a}$$以上$${b}$$以下である要素の個数を調べるには

$$
b以下の要素数 - a未満の要素数
$$

を調べれば良いので2回二分探索するだけで欲しい人数が分かります。
初めてb以下になるのは何個目?ということを2回しただけです。
これにより計算量が$${O(QlogN)}$$となり最悪でもループ数は$${10^{7}}$$程度で済みます。一般にこれは2秒以内で終わります。
またソートも$${O(NlogN)}$$で済みますので、事前準備も高速な上に毎回の検索も高速というとても良いプログラムになりますね。

#include <iostream>
#include <vector>
#include <algorithm>
using namespace std;
int main() {
    int N; cin >> N;
    vector<int> A(N); for (int &v : A) cin >> v;
    sort(A.begin(), A.end());
    int Q; cin >> Q;
    while (Q--) {
        int a, b; cin >> a >> b;
        int ika = upper_bound(A.begin(), A.end(), b) - A.begin();
        int miman = lower_bound(A.begin(), A.end(), a) - A.begin();
        cout << ika - miman << endl;
    }
}

適切な準備2

今回は二分探索が有効だったのでソートという準備をしましたが、状況によっては別の準備が有効な時もあります。
貯金額が最大100万とそこまで大きくない(!?)場合を考えてみます。
ちょっとした工夫として
配列$${B}$$を$${index}$$に金額、値に人数を持つように作成してみます。
貯金額が大きかった時は配列の要素数が多すぎて無理な芸当でした。

vector<int> B(1e6 + 1);
for (auto &&v : A) ++B[v];

これにより$${B_i}$$には$${i}$$円貯金がある人数が格納されるので
$${a}$$円以上$${b}$$円以下の人数は$${\sum B_{[a,b]}}$$となります。
すなわち配列の区間和を求たいのでゼータ変換しましょう。説明は省略します。

vector<int> Z(1e6 + 2);
for (int i = 0; i <= 1e6; ++i) Z[i + 1] = Z[i] + B[i];

このように累積和配列を準備しておくことで、$${Q}$$回の処理は

int cnt = Z[b + 1] - Z[a];

で済みます。
前計算$${O(N)}$$のクエリ$${O(1)}$$とよい計算量が得られました。
ですが無駄にメモリを使っている感がありますね。結局のところ制約次第ということです。
ひとまずこれで上司に提出しましょう。

データ構造を活用しよう


上司「ちょっとまって。国民の貯金が変わらない前提で作ったね?」
あなた「はい、要件にないので。」
上司「貯金額は変わるに決まってるだろ!」
あなた「最初に言ってもらわないと…」


なんということでしょう、誇張が過ぎますがSE業界こんなんばっかですね。
しかし困りました。前述の準備しよう戦法では

  • ソート: 配列の一部分を更新するたび全体をソートしなおす必要

  • 累積和: 同様にゼータ変換しなおす必要

があり、
例えば$${Q}$$回「貯金額の更新 + 検索」を行うとすると、更新のたびに再計算が必要となってしまいます。
これでは準備の意味がなくなってしまっていますね。報連相はしっかりしましょう。

適切なデータ構造1

さて、ソートを毎回やり直さなくても良くなるデータ構造があります。
多くの言語でsetと命名されている、順序付き集合です。
(内部的には平衡二分木、AVL木等)
今回は同じ貯金額の人が複数いることがあるので順序付き多重集合を用いましょう。

#include <iostream>
#include <set>
#include <vector>
using namespace std;
int main() {
    int N; cin >> N;
    vector<int> A(N); for (auto &v : A) cin >> v;
    multiset<int> st(A.begin(), A.end());
    int Q; cin >> Q;
    while (Q--) {
        int a, b; cin >> a >> b;
        int ika = distance(st.begin(), st.upper_bound(b));
        int miman = distance(st.begin(), st.lower_bound(a));
        cout << ika - miman << endl;
    }
}

更新は以下のようにすればよいですね

// 貯金額aをbに更新する場合
st.erase(st.find(a));
st.insert(b);

1回の更新の計算量は$${O(logN)}$$で取得も同様ですので、ソート準備戦法と同じ時間で動くようになります。

適切なデータ構造2

同様に、更新するたび区間和の再計算が必要ないデータ構造を使いたいですがこれにはFenwickTree、SegmentTreeなどがあります。説明略。
もちろん貯金額が大きい場合は配列サイズが爆発しちゃうので、準備戦法の時と同じ100万円までの制約とします。(一応貯金額が大きくても、座標圧縮と変換結果を用いるなどしてやればできないこともないです。)
例えば以下のようにすることで、構築$${O(NlogN)}$$のクエリ$${O(logN)}$$となります。
※データ構造を活用することで高速化できるという事例を紹介したかっただけですので、読み飛ばしていただいて構いません。

#include <iostream>
#include <vector>
using namespace std;
int main() {
    int N; cin >> N; int M = 1e6 + 1;
    vector<int> A(N); for (auto &v : A) cin >> v;
    vector<int> B(M); for (auto &&v : A) ++B[v];
    vector<int> F(M + 1);
    for (int i = 1, f = i + (i & -i); i <= M; ++i, f = i + (i & -i)) {
        F[i] += B[i - 1];
        if (f <= M) F[f] += F[i];
    }
    int Q; cin >> Q;
    while (Q--) {
        int a, b; cin >> a >> b;
        // 貯金額aをbに更新する場合
        for (int f = a + 1; f <= M; f += f & -f) --F[f];
        for (int f = b + 1; f <= M; f += f & -f) ++F[f];
        // a円以上 b円以下の個数を取得
        int cnt = 0;
        for (int f = b + 1; f; f -= f & -f) cnt += F[f];
        for (int f = a; f; f -= f & -f) cnt -= F[f];
        cout << cnt << endl;
    }
}

セグ木がお好きでしたらこちら。

#include <iostream>
#include <vector>
using namespace std;
int main() {
    int N; cin >> N; int M = 1e6 + 1;
    vector<int> A(N); for (auto &v : A) cin >> v;
    vector<int> B(M); for (auto &&v : A) ++B[v];
    vector<int> S(M << 1);
    for (int i = 0; i < M; i++) S[i + M] = B[i];
    for (int i = M - 1; i >= 0; i--) S[i] = S[i << 1] + S[i << 1 | 1];
    int Q; cin >> Q;
    while (Q--) {
        int a, b; cin >> a >> b;
        // 貯金額aをbに更新する場合
        int i = a; --S[i += M];
        while (i >>= 1) S[i] = S[i << 1] + S[i << 1 | 1];
        i = b; ++S[i += M];
        while (i >>= 1) S[i] = S[i << 1] + S[i << 1 | 1];
        // a円以上 b円以下の個数を取得
        int cnt = 0, l = a, r = b + 1;
        for (l += M, r += M; l < r; l >>= 1, r >>= 1) {
            if (l & 1) cnt += S[l++];
            if (r & 1) cnt +=  S[--r];
        }
        cout << cnt << endl;
    }
}

なお区間和だけでなく可換であれば区間の演算結果を保持できます。
抽象化したデータ構造をお求めの方はコメントいただきたいです。好評でしたら別で紹介しようと思います。

最適化問題でも高速化

準備・データ構造による高速化で上司に納得していただけたところで、紹介したこれらのテクニックが別の場面でも活躍することを見てみましょう。
有名なナップサック問題を例に最適化問題を考えてみます。
(今回の本題では無いので適当に読み流して大丈夫です。)

$${N}$$個の品物、容量$${W}$$のナップザックがあり、
$${i}$$番目の品物の価値は$${v_i}$$、重さが$${w_i}$$です。
とりうる価値合計の最大値を求めてください。
$${1 \le N \le 10^3}$$ $${1 \le W \le 10^4}$$ $${1 \le v_i,w_i \le 10^3}$$

動的計画法で有名な問題ですね。一応答えを書いておきます。
DPをご存知でない方はぜひこちらなどをご覧ください。とても楽しいです。

#include <iostream>
#include <vector>
using namespace std;
int main() {
    int N, W; cin >> N >> W;
    vector<int> v(N), w(N);
    for (int i = 0; i < N; ++i) cin >> v[i] >> w[i];
    vector<int> dp(W + 1, 0), nx;
    for (int i = 0; i < N; ++i, swap(dp, nx)) {
        nx = dp;
        for (int j = 0; j <= W; ++j) {
            if (w[i] > j) continue;
            nx[j] = max(nx[j], dp[j - w[i]] + v[i]);
        }
    }
    cout << dp[W] << endl;
}

このプログラムの最悪時間計算量は$${O(NW)}$$で今回の制約下では十分高速です。
あまりにもテンプレとなった問題ですし、そのまま有効なシーンは泥棒くらいなのでこれ以上言及することはありません。
しかしながら、動的計画法にはバリエーションと呼ぶのがおこがましい程に多くの用途があります。ほんと凄いアルゴリズムですね。
ではもっと現実的な状況を考えてみましょう。


またもや上司に仕事を振っていただきました。
「会社員全員の売り上げはこのグラフになってて
 業績に大きな差があれば青、だいたい同じなら赤をつけておいた」
「だいたい同じ成績な人ごとに分けて忘年会をしようと思っているが
 最大で何人のグループができるかを知りたいんだ」
「例えばこれなら7人が赤で繋がってるでしょう?ただ社員は3万人で、
 売り上げは高くて500万円くらいだなぁ。」
「あ、あとできたらお店の予約もよろしく。」

売り上げ

各社員の売り上げイメージ


  • そんな組み分け方しないでしょ

  • 最大長だけでいいのか

  • だいたい同じってなんやねん😡

  • 統計学をなめるな😤

などの声が聞こえてきそうですが、泥棒よりは現実的な問題ですね。
(最大長だけにしたのは簡単化のためです)
より本質的には

$${N}$$個の数$${A}$$について
次の数との絶対値が$${K}$$以下であるものの最大長。
例) K=2で成績「3, 100, 2, 4」なら「3, 2, 4」が
 最長のだいたい同じグループなので、答えは「3人」。
$${1 \le N \le 3 \times 10^4}$$
$${0 \le K \le 5 \times 10^6, K \in \mathbb{Z}}$$
$${0 \le A_i \le 5 \times 10^6, A_i \in \mathbb{Z}}$$

(ここもまだ本題では無いので適当に読み流して大丈夫です。)
一見すると動的計画法で済む問題ですが、高速化の余地があります。
ナップサック問題と同じようにDPを考えて
$${dp_{A_i} := A_i}$$がグループにいる時の最大長とすると漸化式は

$$
dp_{A_i} =
\begin{cases}
dp_i = 0 & (初期値)\
\underset{max(0, A_i-K)\le j \le min(A_i+K, 5 \times 10^6)}{max}dp_j+1\
\end{cases}
$$

とできるので、以下のようなプログラムになります。

#include <iostream>
#include <vector>
using namespace std;
int main() {
    int N, K; cin >> N >> K; int M = 5e6;
    vector<int> A(N); for (int &v : A) cin >> v;
    vector<int> dp(M + 1, 0);
    for (int i = 0; i < N; ++i) {
        int mx = 0;
        for (int j = max(0, A[i] - K); j <= min(A[i] + K, M); ++j) {
            mx = max(mx, dp[j]);
        }
        dp[A[i]] = mx + 1;
    }
    int mx = 0;
    for (auto &&v : dp) mx = max(mx, v);
    cout << mx << endl;
}

しかしこれでは$${O(NM)}$$ですので最悪$${15 \times 10^{10}}$$ループとなってしまいます。
それでは高速化していきましょう。ボトルネックを探します。
DPとして順番に処理する以上N周はしてしまいますが、1度の区間最大値を求める

$$
\underset{max(0, A_i-K)\le j \le min(A_i+K, 5 \times 10^6)}{max}dp_j
$$

の部分には高速化の余地がありそうです。
貯金額の検索では区間和を求めていましたが今回は区間最大値です。
さらに配列を更新しながら区間最大値を取得したいので、事前計算でなくデータ構造(セグ木)を使うことで高速化できます。

#include <iostream>
#include <vector>
using namespace std;
int main() {
    int N, K; cin >> N >> K; int M = 5e6; int sz = M + 1;
    vector<int> A(N); for (int &v : A) cin >> v;
    vector<int> dp(sz << 1, 0);
    for (int i = 0; i < N; ++i) {
        int mx = 0, l = max(0, A[i] - K), r = min(A[i] + K, M) + 1;
        for (l += sz, r += sz; l < r; l >>= 1, r >>= 1) {
            if (l & 1) mx = max(mx, dp[l++]);
            if (r & 1) mx = max(mx, dp[--r]);
        }
        dp[A[i] += sz] = mx + 1;
        while (A[i] >>= 1) dp[A[i]] = max(dp[A[i] << 1], dp[A[i] << 1 | 1]);
    }
    cout << dp[1] << endl;
}

これで$${O(NlogM)}$$での$${7 \times 10^5}$$程度となり高速です。
(定数倍を考慮するともうちょい行ってますが)

データ構造を活用することで晴れて高速化ができました。

状況に見合った工夫

工夫にもいろいろあるという例を紹介します。


社長「よしよし、じゃあ忘年会も終わったことだし会計とするか。」
社長「1万人いるから、端から10人ずつ、勘定の合計を教えてね」
社長「1000人の部長が払ってくれ」
部長A「社長はどうするんですか?」
社長「ありがとう」


勘のいいみなさんはお気づきかと思いますが、そうです。区間和です。
しかも忘年会は終わったので会計は増えません。
累積和を使ってもよいですが、端から10人ずつとわかってますので普通に計算したらよいだけです。

A = [int(x) for x in input().split()]
B = []
for i in range(100):
  B.append(sum([A[i * 10 + v] for v in range(10)]))
print(B)

このように、必ずしも何か工夫が必要というわけではありません


部長B「まって、10人の合計が一番低いとこがいい」
社長「え、ずる」
部長B「あんたは払ってないやろがい」
部長B「きみ、10人の合計の一番低いの求めてくれるかい?」
部長B「飛び飛びは面倒なので、隣り合う10人がいいな」


仕方ないですね。1万人の勘定で連続10人の合計の最小値を探しましょう。
愚直にやるとこのようになります。

A = [int(x) for x in input().split()]
B = 1e8 # 流石に1億円の勘定はないとする
for i in range(10000 - 10 + 1):
  s = sum([A[j] for j in range(i, i + 10)])
  B = min(B, s)
print(B)

さて、ループ数をみると10万回くらいです。特に工夫はしていませんが十分高速です。制約が小さいからですね。
やはり必ずしも何か工夫が必要というわけではありません。


それでは同じプログラムで制約が大きい時も考えてみましょう。
人数が100万で100人ごとの合計の最小値を求める場合だと
100万x100で1億回程計算することになります。高速化の余地ありです。
愚直解のボトルネックというか無駄な部分は、同じ人を何度も見に行っているところです。

差分だけ更新するようにしましょう。以下のイメージを持ちます。

移動前

最初の区間を計算後に次の区間を見る

移動後

100人分の区間の合計値を求めたら、区間を一つ右にスライドする時

1人目を消し101人目を追加するだけで2個目の区間の合計が求まります。

A = [int(x) for x in input().split()]
s = sum([A[i] for i in range(100)])
B = s
for i in range(1e6 - 100):
  s -= A[i]
  s += A[i + 100]
  B = min(B, s)
print(B)

人数$${N = 10^6}$$、区間幅$${M = 100}$$とすると
このプログラムのだいたい計算量は$${O(N)}$$で高速です。
累積和を用いた場合を考えてみると前計算$${O(N)}$$を区間走査$${O(N)}$$となり、時間計算量こそ変わらないものの定数倍や空間の無駄が際立ちます。私は好きですよ?

このように、高速化には状況に見合った工夫を考えることが必要です。
似たようなものでスライド最小値という手法がありますので、興味のある方は調べてみてください。

最後の話題

ここまで読んでいただきありがとうございました。本記事のメインの内容はこれで終わりです。お疲れ様でした。

最後に、さらにできる工夫の例として私の好きなアルゴリズムを紹介します。
ダイエットでどこまで痩せられるかを考えてみましょう。

あなたは$${N}$$日間ダイエットをすることを決意しました。
ダイエット開始時点でのストレス値は0です。日に1食とります。
ダイエット開始から$${i}$$日目の食事で$${D_i}$$kg体重が増えます。
また1日絶食すると$${A}$$kg痩せますがストレス値が1溜まり、その日の夜に$${B \times }$$(ストレス値)kgだけ太ります。
食事を摂るとストレス値は0に戻ります。
ダイエット開始前体重$${W}$$kgとしたとき、$${N}$$日間で何kgに痩せることができる最小値を求めてください。

出典

とても難しいので簡潔に説明すると

$$
dp_i := (i日目に食事した際の体重変化の最小値)\\
dp_i =
\begin{cases}
0 & i = 0\\
\underset{0 \le j \le i-1}{min}
(
dp_j - A(i-j-1) + \frac{(i-j) \times (i-j-1)}{2}B + D_i
) & otherwise
\end{cases}
$$

(j+1日目~i−1日目は断食しており、減少分+$${B \times \sum}$$等差数列ということ)
これを式変形し

$$
dp_i = \underset{0 \le j \le i-1}{min}
{
(-jB)i +
(dp_j + jA + \frac{j^2 + j}{2}B)
}
-A(i-1)+\frac{i^2-i}{2}B+D_i
$$

とすると、最小値を求める関数部が$${i}$$の一次式になっています。
この時、ConvexHull Trickというアルゴリズムが火を吹きます。
詳細はこちらなどをご覧ください。
直線集合で追加と$${x}$$での最小値を高速に求められるテクです。
凸包における性質を良く活かしていて好きです。

ただダイエットしてるだけなのに、式にすると数学って便利だなと思わされますね。実際のコードは略しますが、興味があれば調べてみてください。

あとがき

記事中のコードは一応動作確認をしていますが、万が一ミスなどありましたら温かく教えていただけると大変助かります。

また要所で説明を省きましたが、これらも声が多ければ別途紹介したいと思っています。

以上

ALHについて知る



↓ ↓ ↓ 採用サイトはこちら ↓ ↓ ↓ 


↓ ↓ ↓ コーポレートサイトはこちら ↓ ↓ ↓


↓ ↓ ↓ もっとALHについて知りたい? ↓ ↓ ↓



みんなにも読んでほしいですか?

オススメした記事はフォロワーのタイムラインに表示されます!