Merge policies (mutation-based emit)

What you’ll learn: How to control how repeated updates combine (sum, append, min/max) instead of overwriting.

Prerequisites: Understanding of InstanceEmit (see Instance emission).

Control how repeated updates to the same field are combined when multiple traversals contribute to one instance.

Why merge policies?

Default behavior (last-write-wins): When multiple traversals write to the same field on the same instance, the second write overwrites the first.

Example:

  • Traversal 1 sets user.status = "active"
  • Traversal 2 sets user.status = "premium"
  • Result: user.status = "premium" (last write wins)

With merge policies: You can change this behavior to sum numbers, append to lists, take minimum/maximum values, etc.

Available policies

Policy Behavior Use case
AddPolicy() Adds numbers together Counters, sums, totals
AppendPolicy() Appends single items to list Collecting individual tags/flags
ExtendPolicy() Extends list with another list Merging lists from different sources
MinPolicy() Keeps minimum value Finding earliest date, lowest score
MaxPolicy() Keeps maximum value Finding latest date, highest score
FirstNonNullPolicy() Keeps first non-null value Fallback defaults

Simple example: Counting with AddPolicy

from etielle.instances import InstanceEmit, FieldSpec, PydanticBuilder, AddPolicy
from etielle.transforms import get, literal
from etielle.core import MappingSpec, TraversalSpec
from etielle.executor import run_mapping
from pydantic import BaseModel

class User(BaseModel):
    id: str
    login_count: int = 0

# This emit increments login_count each time it's called
emit = InstanceEmit[User](
    table="users",
    join_keys=[get("id")],
    fields=[
        FieldSpec(selector="id", transform=get("id")),
        FieldSpec(selector="login_count", transform=literal(1)),  # Add 1 each time
    ],
    builder=PydanticBuilder(User),
    policies={"login_count": AddPolicy()},  # Sum instead of overwrite
)

# If two traversals emit for the same user:
# Traversal 1: login_count += 1  → 1
# Traversal 2: login_count += 1  → 2
# Result: user.login_count = 2

root = {"users": [{"id": "u1"}]}
mapping = MappingSpec(traversals=[
    TraversalSpec(path=["users"], mode="auto", emits=[emit]),
    TraversalSpec(path=["users"], mode="auto", emits=[emit]),  # Same emit twice
])

res = run_mapping(root, mapping)
u = list(res["users"].instances.values())[0]
print(f"login_count: {u.login_count}")  # Prints: login_count: 2
login_count: 2

Example: Multiple policies

from etielle.instances import InstanceEmit, FieldSpec, PydanticBuilder, AddPolicy, AppendPolicy
from etielle.transforms import get, literal
from etielle.core import field_of, MappingSpec, TraversalSpec
from etielle.executor import run_mapping
from pydantic import BaseModel

class User(BaseModel):
    id: str
    login_count: int
    tags: list[str]

emit = InstanceEmit[User](
    table="users",
    join_keys=[get("id")],
    fields=[
        FieldSpec(selector=field_of(User, lambda u: u.id), transform=get("id")),
        FieldSpec(selector=field_of(User, lambda u: u.login_count), transform=literal(1)),
        FieldSpec(selector=field_of(User, lambda u: u.tags), transform=literal("alpha")),
    ],
    builder=PydanticBuilder(User),
    policies={
        "login_count": AddPolicy(),
        "tags": AppendPolicy(),
    },
)

root = {"users": [{"id": "u1"}]}
emit2 = InstanceEmit[User](
    table="users",
    join_keys=[get("id")],
    fields=[
        FieldSpec(selector=field_of(User, lambda u: u.login_count), transform=literal(1)),
        FieldSpec(selector=field_of(User, lambda u: u.tags), transform=literal("beta")),
    ],
    builder=PydanticBuilder(User),
)
mapping = MappingSpec(traversals=[
    TraversalSpec(path=["users"], mode="auto", emits=[emit]),
    TraversalSpec(path=["users"], mode="auto", emits=[emit2]),
])

res = run_mapping(root, mapping)
u = list(res["users"].instances.values())[0]
print(u.login_count, u.tags)
2 ['alpha', 'beta']

Behavior and caveats

  • Policies are applied during update time per key/field.
  • Type mismatches are recorded as update errors; the row continues.
  • Deterministic ordering follows traversal arrival order.

For more on how errors are reported, see Error reporting.

See also