Engineering from Scratch

エンジニア目指してます

Rubyでの並行処理

並列処理について学習する機会があったため,Rubyで知識のブラッシュアップを行う。

プロセスとスレッド

両者の名前を頻繁に聞くが,全く意味を理解していなかった。プロセスは実行されるプログラムの単位で,その中にスレッドが1つ以上存在する。

そして,1つのスレッドは1つのコアによって動かされる。

また,プロセスごとでメモリ空間は独立しているが,マルチスレッドの場合,同じプロセス内に存在するので共有のメモリ空間を使用する。

並行処理と並列処理

両者の区別が曖昧に使われていることが多いと思うが,以下の説明が理解しやすかった。

並行処理: ある時間の範囲において、複数のタスクを扱うこと 並列処理: ある時間の点において、複数のタスクを扱うこと

並行処理実践

以下では,実際にRubyを使用して,並行処理を試してみる。

https://tech.unifa-e.com/entry/2021/01/29/175021 を参考にした。

通常の処理

実装は以下となる。

def sleepy
  puts Process.pid
  sleep 3
end

def test
  start_time = Time.now
  4.times { sleepy }
  end_time = Time.now
  puts end_time - start_time
end

sleepyでは,プロセスのIDを出力し,3秒間待つ。testでは,sleepyを4回普通に実行し,その経過時間を計測した。

testメソッドの実行結果は以下となる。

89867
89867                     
89867                     
89867                     
12.00396

全てのプロセスIDは等しく,経過時間も3*4=12秒で,逐次的に処理が行われることがわかる。

スレッドでの並行処理

実装は以下となる。

def test1
  start_time = Time.now
  Parallel.each(Array.new(4), in_threads: 4) { sleepy }
  end_time = Time.now
  puts end_time - start_time
end

実行結果は以下となる。

89867
89867                                                         
89867                                                         
89867                                                         
3.005605

全て同じプロセス内のスレッドで実行されているため,プロセスIDは等しく,しかし並行で処理が行われているため,処理時間は3秒となる。

プロセスでの並行処理

実装は以下となる。

def test2
  start_time = Time.now
  Parallel.each(Array.new(4), in_processes: 4) { sleepy }
  end_time = Time.now
  puts end_time - start_time
end

実行結果は以下となる。

90074
90075                     
90076                     
90077                     
3.046518

4つのプロセス内の各スレッドで処理が実行されているため,全てのプロセスIDは異なり,並行で処理が行われているため,実行時間は3秒となる。

補足

参考リンク内では,並行処理と記載されていることが最初は理解できなかった。並列処理が実行されているのではないかと思っていた。そこで,まずは自分のPCのコア数を調べてみた。

sysctl -n hw.physicalcpu_max => 4 # 物理コア数

よって,1コアあたり2スレッドの実行を行うことができるため,8個のスレッドを並列で処理を行うことができる。そこで,100個のプロセスを立てて処理を実行してみる。

実装は以下となる。

def test3
  start_time = Time.now
  Parallel.each(Array.new(20), in_processes: 20) { sleepy }
  end_time = Time.now
  puts end_time - start_time
end

処理結果は以下となる。

91009
91010                     
91011                     
91012                     
91013                     
91015                     
91014                     
91017                     
91016                     
91019                     
91018                     
91021                     
91022                     
91020
91023
91024
91028
91025
91027
91026
3.08953

プロセスが20個立ち,処理時間は3秒となっている。CPUのコア数を考えれば最大で8個までしかプロセスが立たないのに,それを超える20個のプロセスが立っている。というのも,今回のケースでは,20個のプロセスが並列で処理されているわけではなく,並行で処理されているためこのようにCPUのコア数を超えるプロセスを立てることができる。よってこれが理由でブログでは並行処理と正しく記載されていた。

Fiberを使った処理

参考ブログでも載せられてるように,Fiberで並行処理を行う実装が複雑だったため,asyncというgemを使用して並行処理を実装する。実装は以下。

require 'async'

def sleepy
  Async do
    puts Process.pid
    sleep 3
  end
end

def test5
  start_time = Time.now
  task = Async do
    4.times { sleepy }
  end
  task.wait
  end_time = Time.now
  puts end_time - start_time
end

実行結果は以下となる。

28292829
                          
2829                      
2829                      
3.000941  

プロセスIDは全て同じで,3秒後に処理が終了しているため,今まで通り並行で処理が行われていることがわかる。

補足

今回,上記のようにFiberで並行処理を行うFiber Schedulerの詳細については調べきれなかったが,Fiberの詳細を少しだけ調べたので,補足する。

Fiberには,.yield#resumeというメソッドが存在する。.yieldで渡した引数が,#resumeを呼び出すと返される。#resume.yieldを呼び出した回数+1回呼び出すことができ,最後の#resumeでは,.newブロックの戻り値が渡される。

具体的な実装で見てみる。

irb(main):077:1* fiber = Fiber.new do
irb(main):078:1*   1
irb(main):079:1*   Fiber.yield 2
irb(main):080:1*   3
irb(main):081:0> end
=> #<Fiber:0x000000010e1c2a78 (irb):77 (created)>
irb(main):082:0> fiber.resume
=> 2
irb(main):083:0> fiber.resume
=> 3
irb(main):084:0> fiber.resume
(irb):84:in `resume': attempt to resume a terminated fiber (FiberError)

fiberにはFiberインスタンスであり,一回目の#resumeでは,.yieldで渡された2が返される。二回目の.yieldでは,.newブロックの戻り値が返され,三回目は.yieldが今回は一回しか呼びされてないので,エラーとなる。

また,#resumeが呼び出された時は,.yieldで渡された引数が返されるが,処理は,.yieldの直前まで行われ,次の#resumeが呼び出されると,.yieldの処理から再開される。実装を見てみる。

irb(main):091:1* fiber = Fiber.new do
irb(main):092:1*   puts 1
irb(main):093:1*   puts Fiber.yield 2
irb(main):094:1*   puts 3
irb(main):095:0> end
=> #<Fiber:0x000000010e021728 (irb):91 (created)>
irb(main):096:0> fiber.resume
1
=> 2                                                                      
irb(main):097:0> fiber.resume

3                                                                    
=> nil

一回目の#resumeでは,puts 1が実行され返り値は,.yieldの引数である2となる。二回目の#resumeでは,puts Fiber.yield 2から処理が始まり,puts 3まで実行される。