Nested Sets Model(入れ子集合モデル)でツリー構造を扱う

企業向けのシステムで階層的な部署やグループを扱いたいときや、掲示板のスレッドを階層的に表現したい場合など、RDBでツリー構造を扱いたいことってけっこうありますよね。こういうときって、まず最初に思いつくのが、親のレコードIDを子レコードの外部キーとして保持する方法ではないかと思います。この方法を隣接リストモデルといいます。

Ruby on RailsのActiveRecordでいうと、こんな感じです。

class Department < ApplicationRecord
belongs_to :parent, class_name: 'Department', inverse_of: :children, optional: true
has_many :children, class_name: 'Department', foreign_key: :parent_id, inverse_of: :parent

このモデルでも用途によっては問題なく動作しますが、階層が増えるにつれ、例えばある社員が所属する部署の直属の上司を取得したいとか、子スレッドをまとめて取得したい場合に、データベースへの問い合わせが増え、パフォーマンスが劣化してしまうことがあります。

そのため、隣接リストモデルの他にも階層構造を扱うためにいくつかのモデルが考案されています。

  • ancestry(経路列挙)モデル
  • closure_tree (閉包テーブル)モデル
  • nested_set (入れ子集合)モデル

ここではRunbookでも使用している、nested_set(入れ子集合)モデルをActiveRecordで扱う方法について紹介します。

nested_set (入れ子集合)モデルとは

https://en.wikipedia.org/wiki/Nested_set_model

400px-Clothing-hierarchy-traversal-2.svg.png

各ノードを表すレコードに、left/right属性を持たせるのがこのモデルの特徴となります。left/rightはルートからリーフに向かって一意の番号が割り振られていきます。

あるノードの先祖はこのように一回のクエリで取得することができます。

def ancestors
return Department.none if self.lft == 0 || self.rgt == 0
self
.class
.where(['lft < ?', self.lft])
.where(['rgt > ?', self.lft])
.order(:lft)
end

同様に、あるノードの子孫はこのように取得できます。

def descendants
return Department.none if self.lft == 0 || self.rgt == 0
self
.class
.where(['lft > ?', self.lft])
.where(['lft < ?', self.rgt])
.order(:lft)
end

この他にも、リーフノードのみを取得したり、兄弟ノードの子孫を取得することも容易にできます。

https://mikehillyer.com/articles/managing-hierarchical-data-in-mysql/

Nested Set Modelのデメリット

  1. ノードの追加や削除、移動が頻繁に行われる場合、left/rightの値を再計算する必要があり、大量のデータを持つツリーではパフォーマンスに影響が出る可能性がある。
  2. 他のツリーモデルに比べて理解や実装が難しい。

ActiveRecordによる実装

以下はシンプルな実装例です。追加、更新、削除が発生する度、ルートノードをロックして変更が競合しないようにしています。

# frozen_string_literal: true

# == Schema Information
#
# Table name: departments
#
# id :bigint not null, primary key
# parent_id :bigint
# lft :bigint default(0), not null
# rgt :bigint default(0), not null
# name :string(255) not null
# depth :integer default(0), not null
# created_at :datetime not null
# updated_at :datetime not null
#
# Indexes
#
# index_departments_on_parent_id (parent_id)
#

class Department < ApplicationRecord
belongs_to :parent, class_name: 'Department', inverse_of: :children, optional: true
has_many :children, class_name: 'Department', foreign_key: :parent_id
, inverse_of: :parent

before_validation :initialize_tree, on: :create
validates :name, presence: true

after_save :update_tree
before_destroy :remove_with_lock!

def self.root
find_by(parent_id: nil)
end

def root?
parent_id.nil?
end

def ancestors
return Department.none if self.lft == 0 || self.rgt == 0
self
.class
.where(['lft < ?', self.lft])
.where(['rgt > ?', self.lft])
.order(:lft)
end

def descendants
return Department.none if self.lft == 0 || self.rgt == 0
self
.class
.where(['lft > ?', self.lft])
.where(['lft < ?', self.rgt])
.order(:lft)
end

private

def initialize_tree
return unless root?
self.lft = 1
self.rgt = 2
end

def update_tree
return if root?
if self.saved_change_to_parent_id?
self.id_before_last_save.nil? ? append_with_lock! : move_with_lock!
end
end

def append
new_left = self.parent.rgt
self.class.where('rgt >= ?', new_left).update_all('rgt = rgt + 2')
self.class.where('lft > ?', new_left).update_all('lft = lft + 2')
self.update_columns(
lft: new_left,
rgt: new_left + 1,
depth: self.parent.depth + 1
)
end

def remove
width = self.rgt - self.lft + 1
self
.class
.where('lft >= ?', self.lft)
.where('rgt <= ?', self.rgt)
.update_all('lft = 0, rgt = 0')
self
.class
.where('rgt > ?', self.rgt)
.update_all("rgt = rgt - #{width}")
self
.class
.where('lft > ?', self.rgt)
.update_all("lft = lft - #{width}")
end

def move
width = self.rgt - self.lft + 1
new_left = self.parent.rgt
new_left -= width if self.parent.rgt > self.rgt

self
.class
.where('lft >= ?', self.lft)
.where('rgt <= ?', self.rgt)
.update_all('lft = 0 - lft, rgt = 0 - rgt')

self
.class
.where('rgt > ?', self.rgt)
.update_all("rgt = rgt - #{width}")

self
.class
.where('lft > ?', self.rgt)
.update_all("lft = lft - #{width}")

self
.class
.where('rgt >= ?', new_left)
.update_all("rgt = rgt + #{width}")

self
.class
.where('lft > ?', new_left)
.update_all("lft = lft + #{width}")

offset = new_left - self.lft
self
.class
.where('lft < 0')
.where('rgt < 0')
.update_all(
"lft = 0 - lft + #{offset}, rgt = 0 - rgt + #{offset}, depth = depth + #{self.parent.depth + 1 - self.depth}"
)
end

def append_with_lock!
Department.root.lock!
self.parent.reload
append
end

def remove_with_lock!
Department.root.lock!
self.reload
remove
end

def move_with_lock!
Department.root.lock!
self.parent.reload
self.reload
if self.parent.id == self.id ||
self.parent.ancestors.where(id: self.id).exists?
errors.add(:parent, :invalid)
raise ActiveRecord::RecordInvalid, self
end
move
end
end