class V2Pool(Actor):
"""Constant product AMM with platform LP token"""
def init(
self,
token_a: bytes32,
token_b: bytes32,
fee_bps: u16,
hook: address | None = None
):
require(token_a != token_b, "identical tokens")
# Sort tokens for canonical ordering
if token_a > token_b:
token_a, token_b = token_b, token_a
self.token_a = token_a
self.token_b = token_b
self.fee_bps = fee_bps
self.hook = hook
self.reserve_a = 0
self.reserve_b = 0
# Create platform LP token
self.lp_token = Token.create(
name=f"Cowswap V2 LP",
symbol="COW-V2-LP",
decimals=18,
initial_supply=0
)
# TWAP oracle (updated via timer)
self.price_cumulative_a = 0
self.price_cumulative_b = 0
self.last_block = block.height
# Schedule TWAP updates
self.schedule_timer(interval=1, handler="update_oracle")
def swap(
self,
token_in: bytes32,
amount_in: u256,
min_amount_out: u256,
recipient: address
) -> u256:
require(token_in in (self.token_a, self.token_b), "invalid token")
require(amount_in > 0, "zero input")
# Hook check
if self.hook:
allowed = Hook(self.hook).can_swap(
self.address, msg.sender, token_in, amount_in, min_amount_out
)
require(allowed, "swap blocked by hook")
# Determine direction
is_a_to_b = token_in == self.token_a
reserve_in = self.reserve_a if is_a_to_b else self.reserve_b
reserve_out = self.reserve_b if is_a_to_b else self.reserve_a
# Calculate output using platform primitive
amount_out = amm_get_amount_out(amount_in, reserve_in, reserve_out, self.fee_bps)
require(amount_out >= min_amount_out, "slippage")
# Transfer tokens
Token.transfer_from(token_in, msg.sender, self.address, amount_in)
token_out = self.token_b if is_a_to_b else self.token_a
Token.transfer(token_out, recipient, amount_out)
# Update reserves
if is_a_to_b:
self.reserve_a += amount_in
self.reserve_b -= amount_out
else:
self.reserve_b += amount_in
self.reserve_a -= amount_out
# Hook notification
if self.hook:
Hook(self.hook).on_swap(
self.address, msg.sender, token_in, amount_in, amount_out
)
emit_event("Swap", {
"sender": msg.sender,
"token_in": token_in,
"amount_in": amount_in,
"amount_out": amount_out,
"recipient": recipient
})
return amount_out
def add_liquidity(
self,
amount_a_desired: u256,
amount_b_desired: u256,
amount_a_min: u256,
amount_b_min: u256,
recipient: address
) -> (u256, u256, u256):
# Hook check
if self.hook:
allowed = Hook(self.hook).can_add_liquidity(
self.address, msg.sender, amount_a_desired, amount_b_desired
)
require(allowed, "add liquidity blocked by hook")
# Calculate optimal amounts
if self.reserve_a == 0 and self.reserve_b == 0:
# First deposit sets the ratio
amount_a = amount_a_desired
amount_b = amount_b_desired
else:
# Match current ratio
amount_b_optimal = amm_quote(amount_a_desired, self.reserve_a, self.reserve_b)
if amount_b_optimal <= amount_b_desired:
require(amount_b_optimal >= amount_b_min, "slippage B")
amount_a = amount_a_desired
amount_b = amount_b_optimal
else:
amount_a_optimal = amm_quote(amount_b_desired, self.reserve_b, self.reserve_a)
require(amount_a_optimal <= amount_a_desired, "slippage A")
require(amount_a_optimal >= amount_a_min, "slippage A")
amount_a = amount_a_optimal
amount_b = amount_b_desired
# Transfer tokens
Token.transfer_from(self.token_a, msg.sender, self.address, amount_a)
Token.transfer_from(self.token_b, msg.sender, self.address, amount_b)
# Mint LP tokens
total_supply = Token.total_supply(self.lp_token)
if total_supply == 0:
lp_amount = sqrt(amount_a * amount_b) - 1000 # Minimum liquidity lock
Token.mint(self.lp_token, address(0), 1000) # Lock minimum
else:
lp_amount = min(
amount_a * total_supply / self.reserve_a,
amount_b * total_supply / self.reserve_b
)
require(lp_amount > 0, "insufficient liquidity minted")
Token.mint(self.lp_token, recipient, lp_amount)
# Update reserves
self.reserve_a += amount_a
self.reserve_b += amount_b
emit_event("AddLiquidity", {
"sender": msg.sender,
"amount_a": amount_a,
"amount_b": amount_b,
"lp_minted": lp_amount,
"recipient": recipient
})
return (amount_a, amount_b, lp_amount)
def remove_liquidity(
self,
lp_amount: u256,
amount_a_min: u256,
amount_b_min: u256,
recipient: address
) -> (u256, u256):
# Hook check
if self.hook:
allowed = Hook(self.hook).can_remove_liquidity(
self.address, msg.sender, lp_amount
)
require(allowed, "remove liquidity blocked by hook")
total_supply = Token.total_supply(self.lp_token)
# Calculate token amounts
amount_a = lp_amount * self.reserve_a / total_supply
amount_b = lp_amount * self.reserve_b / total_supply
require(amount_a >= amount_a_min, "slippage A")
require(amount_b >= amount_b_min, "slippage B")
# Burn LP tokens
Token.transfer_from(self.lp_token, msg.sender, self.address, lp_amount)
Token.burn(self.lp_token, lp_amount)
# Transfer tokens
Token.transfer(self.token_a, recipient, amount_a)
Token.transfer(self.token_b, recipient, amount_b)
# Update reserves
self.reserve_a -= amount_a
self.reserve_b -= amount_b
emit_event("RemoveLiquidity", {
"sender": msg.sender,
"lp_burned": lp_amount,
"amount_a": amount_a,
"amount_b": amount_b,
"recipient": recipient
})
return (amount_a, amount_b)
def update_oracle(self):
"""Timer callback: update TWAP oracle"""
blocks_elapsed = block.height - self.last_block
if blocks_elapsed > 0 and self.reserve_a > 0 and self.reserve_b > 0:
# Accumulate price * time
self.price_cumulative_a += (self.reserve_b << 128) / self.reserve_a * blocks_elapsed
self.price_cumulative_b += (self.reserve_a << 128) / self.reserve_b * blocks_elapsed
self.last_block = block.height
def get_twap(self, token: bytes32, period_blocks: u64) -> u256:
"""Get time-weighted average price over period"""
# Implementation uses price_cumulative snapshots
...