Theory Serializable

(*<*)
theory Serializable
  imports Main
begin
(*>*)

section ‹Transactions›

text ‹We work with a rather abstract model of transactions comprised of read/write actions.›

text ‹Read/written values are natural numbers.›
type_alias val = nat

text ‹Transactions typ'xid may read from/write two addresses typ'addr.›
datatype ('xid, 'addr) action = isRead:  Read  (xid_of: 'xid) (addr_of: 'addr)
                              | isWrite: Write (xid_of: 'xid) (addr_of: 'addr)

text ‹A schedule is a sequence of actions.›
type_synonym ('xid, 'addr) schedule = ('xid, 'addr) action list

text ‹A database, which is being modified by the read/write actions, maps addresses to values.›
type_synonym 'addr db = 'addr  val

text ‹Each transaction has a local state, which is represented as the list of previously read values (and the addresses they have been read from).›
type_synonym 'addr xstate = ('addr × val) list

text ‹The values written by a transaction are given by a higher-order parameter and may depend on the previously read values.›
context fixes write_logic :: 'xid  'addr xstate  'addr  val begin

text ‹Read values are recorded in the transaction's local state; writes modify the database.›
fun action_effect :: ('xid, 'addr) action  ('xid  'addr xstate) × 'addr db  ('xid  'addr xstate) × 'addr db where
  action_effect (Read xid addr) (xst, db) = (xst(xid := (addr, db addr) # xst xid), db)
| action_effect (Write xid addr) (xst, db) = (xst, db(addr := write_logic xid (xst xid) addr))

text ‹We are interested in how a schedule modifies the database (local state changes are discarded at the end).›
definition schedule_effect :: ('xid, 'addr) schedule  'addr db 'addr db where
  schedule_effect s db = snd (fold action_effect s (λ_. [], db))

end

text ‹Actions that belong to the same transaction.›
definition eq_xid where
  eq_xid a b = (xid_of a = xid_of b)

section ‹Serial and Serializable Schedules›

declare length_dropWhile_le[termination_simp]

text ‹A serial schedule does not interleave actions of different transactions.›
fun serial :: ('xid, 'addr) schedule  bool where
  serial [] = True
| serial (a # as) = (let bs = dropWhile (λb. eq_xid a b) as
    in serial bs  xid_of a  xid_of ` set bs)

text ‹A schedule terms can be rearranged into schedule termt by a permutation termπ,
which preserves the relative order of actions related by termeq.›
definition permutes_upto where
  permutes_upto eq π s t = 
     (bij_betw π {..<length s} {..<length t} 
      (i<length s. s ! i = t ! π i) 
      (i<length s. j<length s. i < j  eq (s ! i) (s ! j)  π i < π j))

lemma permutes_upto_Nil[simp]: permutes_upto R π [] []
  by (auto simp: permutes_upto_def bij_betw_def)

text ‹Two schedules are equivalent if one can be rearranged into another without
rearranging the actions of each transaction and in addition they have the
same effect on any database for any fixed write logic.›
abbreviation equivalent :: ('xid, 'addr) schedule  ('xid, 'addr) schedule  bool where
  equivalent s t  (π. permutes_upto eq_xid π s t  (write_logic db. schedule_effect write_logic s db = schedule_effect write_logic t db))

text ‹A schedule is serializable if it is equivalent to some serial schedule. Serializable schedules
thus provide isolation: even though actions of different transactions may be interleaved, the effect
from the point of view of each transaction is as if the transaction was the only one executing in the system
(as is the case in serial schedules).›
definition serializable :: ('xid, 'addr) schedule  bool where
  serializable s = (t. serial t  equivalent s t)

section ‹Conflict Serializable Schedules›

text ‹Two actions of different transactions are conflicting if they access the same address and at least one of them is a write.›
definition conflict where
  conflict a b = (xid_of a  xid_of b  addr_of a = addr_of b  (isWrite a  isWrite b))

text ‹Two schedules are conflict-equivalent if one can be rearranged into another without rearranging conflicting actions
or actions of one transaction. Note that unlike equivalence, the conflict-equivalence notion is purely syntactic, i.e.,
not talking about databases and action/schedule effects.›
abbreviation conflict_equivalent :: ('xid, 'addr) schedule  ('xid, 'addr) schedule  bool where
  conflict_equivalent s t  (π. permutes_upto (sup eq_xid conflict) π s t)

text ‹A schedule is conflict-serializable if it is conflict equivalent to some serial schedule.›
definition conflict_serializable :: ('xid, 'addr) schedule  bool where
  conflict_serializable s = (t. serial t  conflict_equivalent s t)

section ‹Conflict-Serializability Implies Serializability›

text ‹In the following, we prove that the syntactic notion implies the semantic one.
The key obsevation is that swapping non-conflicting actions of different transactions
preserves the overall effect on the database.›

lemma swap_actions: ¬ conflict a b  ¬ eq_xid a b 
  action_effect wl a (action_effect wl b st) = action_effect wl b (action_effect wl a st)
  unfolding conflict_def eq_xid_def
  by (cases a; cases b; cases st) (auto simp add: fun_upd_twist insert_commute)

lemma swap_many_actions: i < length p. ¬ conflict a (p ! i)  ¬ eq_xid a (p ! i) 
  action_effect wl a (fold (action_effect wl) p st) = fold (action_effect wl) p (action_effect wl a st)
proof (induct p arbitrary: st)
  case (Cons b p)
  from Cons(2) show ?case
    unfolding fold_simps
    by (subst Cons(1)[of action_effect wl b st]) (auto simp: swap_actions[of a b])
qed simp

lemma fold_action_effect_eq:
  assumes t = p @ a # u
  shows 
  fold (action_effect wl) s (action_effect wl a st) =
   fold (action_effect wl) (p @ u) (action_effect wl a st) 
   i < length p. ¬ conflict a (p ! i)  ¬ eq_xid a (p ! i) 
   fold (action_effect wl) (a # s) st = fold (action_effect wl) t st
  unfolding schedule_effect_def fold_simps fold_append o_apply assms
  by (subst swap_many_actions) auto

definition shift where
  shift π = (λi. if i < π 0 then i else i - 1) o π o Suc

lemma bij_betw_remove: bij_betw f A B  x  A  bij_betw f (A - {x}) (B - {f x})
  unfolding bij_betw_def by (auto simp: inj_on_def)

lemma permutes_upto_shift: 
  assumes permutes_upto eq π (a # s) t
  shows permutes_upto eq (shift π) s (take (π 0) t @ drop (Suc (π 0)) t)
proof -
  from assms have π: bij_betw π {..< Suc (length s)} {..< length t}
     i. i < length s + Suc 0  (a # s) ! i = t ! π i
     i j. i < j  i < length s + Suc 0  j < length s + Suc 0 
       eq ((a # s) ! i) ((a # s) ! j)  π i < π j
    unfolding permutes_upto_def by auto
  from π(1) have distinct: π (Suc i)  π 0 if i < length s for i
    using that unfolding bij_betw_def by (auto dest!: inj_onD[of π _ Suc i 0])
  from π(1) have le: π ` {..< Suc (length s)}  {..< length t}
    unfolding bij_betw_def by auto
  then have bij_betw (λi. if i < π 0 then i else i - 1) ({..< length t} - {π 0}) {..< length t - 1}
    by (cases t) (auto 0 3 simp: bij_betw_def inj_on_def image_iff not_less subset_eq Ball_def)
  moreover have bij_betw π {Suc 0 ..< Suc (length s)} ({..< length t} - {π 0})
    using bij_betw_remove[where x = 0, OF π(1)]
    by (simp add: atLeast1_lessThan_eq_remove0)
  moreover have bij_betw Suc {..< length s} {Suc 0 ..< Suc (length s)}
    by (auto simp: lessThan_atLeast0)
  ultimately have bij_betw (shift π) {..< length s} {..< length t - Suc 0}
    unfolding shift_def by (auto intro: bij_betw_trans)
  moreover have s ! i = (take (π 0) t @ drop (Suc (π 0)) t) ! shift π i if i < length s for i
    using that π(2)[of Suc i] le distinct[of i]
    by (force simp: shift_def nth_append not_less subset_eq min_def)
  moreover have shift π i < shift π j
    if i<length s j<length s i < j eq (s ! i) (s ! j) for i j
    using that π(3)[of Suc i Suc j] le distinct[of i] distinct[of j]
    by (auto simp: shift_def not_less subset_eq)
  ultimately show ?thesis
    unfolding permutes_upto_def using le by (auto simp: min_def)
qed

lemma permutes_upto_prefix_upto:
  assumes permutes_upto eq π (t ! π 0 # s) t i < π 0
  shows ¬ eq (t ! π 0) (t ! i)
proof
  assume eq (t ! π 0) (t ! i)
  moreover
  from assms have π: bij_betw π {..< Suc (length s)} {..< length t}
     i. i < length s + Suc 0  (t ! π 0 # s) ! i = t ! π i
     i j. i < j  i < length s + Suc 0  j < length s + Suc 0 
       eq ((t ! π 0 # s) ! i) ((t ! π 0 # s) ! j)  π i < π j
    unfolding permutes_upto_def by auto
  define k where k = the_inv_into {..<Suc (length s)} π i
  from π(1) assms(2) have i < length t
    using bij_betwE lessThan_Suc_eq_insert_0 by fastforce
  with π(1) assms(2) have k > 0 π k = i k < Suc (length s)
    using f_the_inv_into_f[of π {..<Suc (length s)} i]
          the_inv_into_into[of π {..<Suc (length s)} i, OF _ _ subset_refl]
    by (fastforce simp: bij_betw_def set_eq_iff image_iff k_def)+
  ultimately have π 0 < π k
    using π(2)[of k] by (intro π(3)[of 0 k]) auto
  with assms(2) π k = i show False by auto
qed

lemma conflict_equivalent_imp_equivalent:
  assumes conflict_equivalent s t
  shows equivalent s t
proof -
  from assms obtain π where π: permutes_upto (sup eq_xid conflict) π s t
    by blast
  moreover from π have fold (action_effect wl) s st = fold (action_effect wl) t st for wl st
  proof (induct s arbitrary: π t st)
    case Nil
    then show ?case
      by (force simp: permutes_upto_def bij_betw_def)
  next
    case (Cons a s)
    from Cons(2) have π 0 < length t and a: a = t ! π 0
      by (auto simp add: permutes_upto_def bij_betw_def)
    with Cons(2) show ?case
      by (intro fold_action_effect_eq)
        (auto simp only: length_take min_absorb2 less_imp_le nth_take
          intro!: id_take_nth_drop[of π 0 t] Cons(1)[where π=shift π]
          dest: permutes_upto_prefix_upto elim!: permutes_upto_shift)
  qed
  then have schedule_effect wl s db = schedule_effect wl t db for wl db
    unfolding schedule_effect_def by auto
  ultimately show ?thesis
    unfolding permutes_upto_def by blast
qed

theorem conflict_serializable_imp_serializable: conflict_serializable s  serializable s
  unfolding conflict_serializable_def serializable_def
  using conflict_equivalent_imp_equivalent by blast

section ‹ Schedules Generated by Strict Two-Phase Locking (S2PL). ›

text ‹To enforce conflict-serializability database management systems use
locks. Locks come in two kinds: shared locks for reads and exclusive locks for writes.
An address can be accessed in a reading fashion by multiple transactions, each holding a shared lock.
If one transaction however holds an exclusive locks to write to an address, then no
other transaction can hold a lock (neither shared nor exclusive) for the same address.›

datatype 'addr lock = S (addr_of: 'addr) | X (addr_of: 'addr)

fun lock_for where
  lock_for (Read _ addr) = S addr
| lock_for (Write _ addr) = X addr

definition valid_locks where
  valid_locks locks = (addr xid1 xid2. X addr  locks xid1 
     X addr  locks xid2  S addr  locks xid2  xid1 = xid2)

text ‹A frequently used lock strategy is strict two phase locking (S2PL) in which
transactions attempt to acquire locks gradually (whenever they want to execute an action
that needs a particular lock) and release them all at once at the end of each transaction.

The following predicate checks whether a schedule could have been generated using the S2PL strategy.
To this end, the predicate checks for each action, whether the corresponding lock could have been acquired by
the transaction executing the action. We also allow lock upgrades (from shared to exclusive), i.e., one
transaction can hold both a shared and and exclusive lock

As in our model there is no explicit transaction end marker (commit),
we treat each transaction as finished immediately when it has executed its last action in the given schedule.
This is the moment, when the transaction's locks are released.›

fun s2pl :: ('xid  'addr lock set)  ('xid, 'addr) schedule  bool where
  s2pl locks [] = True
| s2pl locks (a # s) = 
     (let xid = xid_of a; addr = action.addr_of a
     in if xid'. xid'  xid  (X addr  locks xid'  isWrite a  S addr  locks xid')
     then False
     else s2pl (locks(xid := if xid  xid_of ` set s then {} else locks xid  {lock_for a})) s)

text ‹We prove in the following that S2PL schedules are conflict-serializable (and thus also serializable).
The proof proceeds by induction on the number of transactions in a schedule. To construct the conflict-equivalent
serial schedule we always move the actions of the transaction that finished first in our S2PL schedule to the front.
To do so we show that these actions are not conflicting with any preceding actions (due to the acquired/held locks).›

lemma conflict_equivalent_trans:
  conflict_equivalent s t  conflict_equivalent t u  conflict_equivalent s u
proof (elim exE)
  fix π π'
  assume permutes_upto (sup eq_xid conflict) π s t permutes_upto (sup eq_xid conflict) π' t u
  then show conflict_equivalent s u
    by (intro exI[of _ π'  π])
      (auto simp: permutes_upto_def bij_betw_trans dest: bij_betwE)
qed

lemma conflict_equivalent_append: conflict_equivalent s t  conflict_equivalent (u @ s) (u @ t)
proof (elim exE)
  fix π
  assume π: permutes_upto (sup eq_xid conflict) π s t
  define π' where π' x = (if x < length u then x else π (x - length u) + length u) for x
  define ππ' where ππ' x = (if x < length u then x
    else the_inv_into {..<length s} π (x - length u) + length u) for x
  from π have bij_betw π' {..<length u + length s} {..<length u + length t}
    unfolding bij_betw_iff_bijections
    by (auto simp: permutes_upto_def bij_betw_def π'_def ππ'_def
      the_inv_into_f_f f_the_inv_into_f split: if_splits
      intro!: exI[of _ ππ'] the_inv_into_into[OF _ _ subset_refl, of _ {..<length s}, simplified])
  with π show conflict_equivalent (u @ s) (u @ t)
    by (intro exI[of _ π']) (auto simp: permutes_upto_def nth_append π'_def)
qed

lemma conflict_equivalent_Cons: conflict_equivalent s t  conflict_equivalent (a # s) (a # t)
  by (metis append_Cons append_Nil conflict_equivalent_append)

lemma conflict_equivalent_rearrange:
  assumes i j. xid_of (s ! i) = xid  j < i  i < length s  ¬ conflict (s ! j) (s ! i)
  shows conflict_equivalent s (filter ((=) xid  xid_of) s @ filter (Not  (=) xid  xid_of) s)
    (is conflict_equivalent s (?filter s))
  using assms
proof (induct length (filter ((=) xid  xid_of) s) arbitrary: s)
  case 0
  then show ?case
    by (auto simp: filter_empty_conv permutes_upto_def intro!: exI[of _ id])
next
  case (Suc x)
  define i where i = (LEAST i. i < length s  xid_of (s ! i) = xid)
  from Suc(2) have i < length s. xid_of (s ! i) = xid
    by (auto simp: Suc_length_conv filter_eq_Cons_iff nth_append nth_Cons')
  then have i < length s  xid_of (s ! i) = xid
    unfolding i_def by (elim LeastI_ex)
  note i = conjunct1[OF this] conjunct2[OF this]
  then have lessi: j < i. xid_of (s ! j)  xid
    using i_def less_trans not_less_Least by blast
  with i have filter_take: filter ((=) xid  xid_of) (take i s) = []
    by (intro filter_False[of take i s ((=) xid  xid_of)]) (auto simp: nth_image[symmetric])
  with i have *: filter ((=) xid  xid_of) s = s ! i # filter ((=) xid  xid_of) (drop (Suc i) s)
    by (subst id_take_nth_drop[of i s]) auto
  from i Suc(3) have j < i. ¬ conflict (s ! j) (s ! i) by blast
  with i lessi have conflict_equivalent s (s ! i # take i s @ drop (Suc i) s)
    (is conflict_equivalent s (s ! i # ?s))
    by (intro exI[of _ λk. if k = i then 0 else if k < i then k + 1 else k])
      (auto simp: permutes_upto_def min_def bij_betw_def inj_on_def nth_append eq_xid_def
      nth_Cons' dest!: gr0_implies_Suc)
  also from Suc(2-) i filter_take have conflict_equivalent (s ! i # ?s) (s ! i # ?filter ?s)
    by (intro conflict_equivalent_Cons Suc(1)) (auto simp: * nth_append)
  also (conflict_equivalent_trans) from i lessi filter_take have s ! i # ?filter ?s = ?filter s
    unfolding * by (subst (8) id_take_nth_drop[of i s]) fastforce+
  finally show ?case .
qed

lemma serial_append:
  serial s  serial t  xid_of ` set s  xid_of ` set t = {}  serial (s @ t)
proof (induct s rule: serial.induct)
  case (2 a as)
  then show ?case 
    using dropWhile_eq_self_iff[THEN iffD2, of t eq_xid a]
    by (fastforce simp: Let_def image_iff dropWhile_append eq_xid_def[symmetric] dest: set_dropWhileD)
qed simp

lemma serial_same_xid: x  set s. xid_of x = xid  serial s
  using dropWhile_eq_Nil_conv[of (λx. xid = xid_of x) tl s]
  by (cases s) (auto simp: Let_def eq_xid_def[abs_def] equals0D simp del: dropWhile_eq_Nil_conv)

lemma conflict_equivalent_same_set: conflict_equivalent s t  set s = set t
  unfolding permutes_upto_def bij_betw_def
  by (auto simp: set_eq_iff in_set_conv_nth Bex_def image_iff) metis+

lemma s2pl_filter:
  s2pl locks s  s2pl (locks(xid := {})) (filter (Not o (=) xid o xid_of) s)
  by (induct locks s rule: s2pl.induct) (auto simp: Let_def fun_upd_twist split: if_splits)

lemma valid_locks_grab[simp]: valid_locks locks 
  ¬ (xid'. xid'  xid_of a 
      (X (action.addr_of a)  locks xid'  isWrite a  S (action.addr_of a)  locks xid')) 
  valid_locks (locks(xid_of a := insert (lock_for a) (locks (xid_of a))))
  by (cases a) (auto simp: valid_locks_def)

lemma s2pl_suffix: valid_locks locks  s2pl locks (s @ t) 
  a  set s. b  set t. eq_xid a b 
  locks'. valid_locks locks'  (xid. locks xid  locks' xid)  s2pl locks' t
proof (induct s arbitrary: locks)
  case (Cons a s)
  from Cons(1)[OF valid_locks_grab[of locks a]] Cons(2-) show ?case
    by (auto simp add: Let_def eq_xid_def fun_upd_def cong: if_cong split: if_splits) blast+
qed auto

lemma set_drop: l  length xs  set (drop l xs) = nth xs ` {l..<length xs}
  by (auto simp: set_conv_nth image_iff) (metis add.commute le_iff_add less_diff_conv)

lemma drop_eq_Cons: i < length xs  drop i xs = xs ! i # drop (Suc i) xs
  by (subst id_take_nth_drop[of i xs]) auto

theorem s2pl_conflict_serializable: s2pl (λ_. {}) s  conflict_serializable s
proof (induct card (xid_of ` set s) arbitrary: s)
  case 0
  then show ?case
    by (auto simp: conflict_serializable_def intro!: exI[of _ []])
next
  case (Suc x)
  define i where i = (LEAST i. i < length s  (j  {i+1 ..< length s}. ¬ eq_xid (s ! i) (s ! j)))
  have i < length s  (j  {i+1 ..< length s}. ¬ eq_xid (s ! i) (s ! j))
    unfolding i_def by (rule LeastI[of _ length s - 1], use Suc(2) in cases s) auto
  note i = conjunct1[OF this] conjunct2[OF this, rule_format]
  define xid where xid = xid_of (s ! i)
  with i have xid: xid  xid_of ` set s
    by (auto simp: image_iff in_set_conv_nth Bex_def)
  from i(1) have *: k  {i ..< length s}. eq_xid (s ! j) (s ! k) if j < i for j
  proof (cases xid = xid_of (s ! j))
    case False
    with that i(1) show ?thesis
    proof (induct i - j arbitrary: j rule: less_induct)
      case less
      from j < i less_trans[OF j < i i < length s]
      obtain j' where j'  {j+1 ..< length s} eq_xid (s ! j) (s ! j')
        by (subst (asm) i_def) (force dest: not_less_Least)
      with less(1)[of j'] less(2-) show ?case
        by (cases j'  i) (auto simp: diff_less_mono2 eq_xid_def)
    qed
  qed (auto simp: xid_def eq_xid_def Bex_def)
  have conflict_equivalent s (filter ((=) xid  xid_of) s @ filter (Not  (=) xid  xid_of) s)
  proof (intro conflict_equivalent_rearrange notI)
    fix k l
    let ?xid = xid_of (s ! k)
    assume kl: xid_of (s ! l) = xid k < l l < length s conflict (s ! k) (s ! l)
    with i(2) have li: l  i
      unfolding xid_def eq_xid_def
      by (metis One_nat_def add.right_neutral add_Suc_right atLeastLessThan_iff not_less_eq_eq)
    from k < l have drop_alt: drop l s = drop (l - Suc k) (drop (Suc k) s)
      by auto
    from li kl have take_drop_l: aset (take l s). bset (drop l s). eq_xid a b
      by (force simp: nth_image[symmetric] set_drop Bex_def conj_commute
        dest!: *[OF less_le_trans])
    from li kl have aset (take k s). bset (drop k s). eq_xid a b
      by (force simp: nth_image[symmetric] set_drop Bex_def conj_commute
        dest!: *[OF less_le_trans[OF less_trans[OF _ k < l]]])
    with kl Suc(3) obtain locks where valid_locks locks s2pl locks (drop k s)
      using s2pl_suffix[of λ_. {} take k s drop k s] by (auto simp: valid_locks_def)
    with kl(2,3) Suc(3) *[of k]  l  i
    have valid_locks (locks(?xid := locks ?xid  {lock_for (s ! k)})) 
      s2pl (locks(?xid := locks ?xid  {lock_for (s ! k)})) (drop (Suc k) s)
      by (subst (asm) drop_eq_Cons)
        (auto simp: Let_def image_iff eq_xid_def[symmetric] set_drop split: if_splits)
    with kl(2) li Suc(3) obtain locks' where
      valid_locks locks'
      xid. (locks(?xid := locks ?xid  {lock_for (s ! k)})) xid  locks' xid
      s2pl locks' (drop l s)
      using take_drop_l unfolding drop_alt
      by (atomize_elim, intro s2pl_suffix[of _ take (l - Suc k) (drop (Suc k) s)])
        (auto simp del: drop_drop
          dest!: in_set_dropD[of _ n take (_ + n) _ for n, folded take_drop])
    with kl show False
      by (subst (asm) drop_eq_Cons, simp)
        (cases s ! k; cases s ! l;
          auto simp: valid_locks_def xid_def Let_def conflict_def split: if_splits)
  qed
  moreover have card (xid_of ` {x  set s. xid  xid_of x}) = card (xid_of ` set s - {xid})
    by (rule arg_cong) auto
  with Suc(2-) have conflict_serializable (filter (Not  (=) xid  xid_of) s)
    using s2pl_filter[of λ_. {} s xid] xid
    by (intro Suc(1)) (auto simp: fun_upd_idem card_Diff_subset_Int)
  then obtain t where t: serial t conflict_equivalent (filter (Not  (=) xid  xid_of) s) t
    unfolding conflict_serializable_def by blast
  ultimately have conflict_equivalent s (filter ((=) xid  xid_of) s @ t)
    by (auto elim!: conflict_equivalent_trans conflict_equivalent_append)
  moreover from t have serial (filter ((=) xid  xid_of) s @ t)
    by (intro serial_append)
      (auto dest!: conflict_equivalent_same_set intro: serial_same_xid[of _ xid])
  ultimately show ?case
    unfolding conflict_serializable_def by blast
qed

corollary s2pl_serializable: s2pl (λ_. {}) s  serializable s
  by (simp add: conflict_serializable_imp_serializable s2pl_conflict_serializable)

section ‹Example Executing S2PL›

text ‹To make the S2PL check executable regardless of the transaction id type, we restrict the
quantification to transaction ids that are occurring in the schedule.›

fun s2pl_code :: ('xid  'addr lock set)  ('xid, 'addr) schedule  bool where
  s2pl_code locks [] = True
| s2pl_code locks (a # s) = 
     (let xid = xid_of a; addr = action.addr_of a
     in if xid'  xid_of ` set s. xid'  xid  (X addr  locks xid'  isWrite a  S addr  locks xid')
     then False
     else s2pl_code (locks(xid := if xid  xid_of ` set s then {} else locks xid  {lock_for a})) s)

lemma s2pl_code_cong: "(xid. xid  xid_of ` set s  f xid = g xid) 
  (xid. xid  xid_of ` set s  f xid = {}) 
  s2pl f s = s2pl_code g s"
proof (induct s arbitrary: f g)
  case (Cons a s)
  from Cons(2-) show ?case
    unfolding s2pl.simps s2pl_code.simps Let_def
    by (intro if_cong[OF _ refl Cons(1)]) (auto 8 2 simp: image_iff)
qed simp

lemma s2pl_code[code_unfold]: "s2pl (λ_. {}) s = s2pl_code (λ_. {}) s"
  by (simp add: s2pl_code_cong)

definition "TB = (0 :: nat)"
definition "TA = (1 :: nat)"
definition "TC = (2 :: nat)"
definition "AX = (0 :: nat)"
definition "AY = (1 :: nat)"
definition "AZ = (2 :: nat)"

text ‹Good example involving a lock upgrade by termTA and termTB
lemma s2pl (λ_. {})
  [Write TB AZ, Read TA AX, Read TB AY, Read TC AX, Write TB AY, Write TC AY, Write TA AX, Write TA AY]
  by eval

text ‹Bad example: termTC cannot acquire exclusive lock for termAY, which is already held by termTA
lemma ¬ s2pl (λ_. {})
  [Read TA AX, Read TB AX, Read TC AX, Write TA AY, Write TC AY, Write TB AY, Write TA AZ]
  by eval

hide_const TB TA TC AX AY AZ
hide_fact TB_def TA_def TC_def AX_def AY_def AZ_def

(*<*)
end
(*>*)