Batching edits

How to operate on many resources efficiently.

Check's API operates on single resources — one employee, one workplace assignment, one benefit per request. This gives you a clear contract for every operation: one request, one response, one outcome. You always know exactly which operations succeeded and which failed, with full control over error handling, ordering, and retry logic.

This guide shows you how to build fast, bulk workflows on top of these single-resource endpoints using concurrent requests.

Rate limits

Check's API allows 25 requests per second per API key. A 429 Too Many Requests response includes a Retry-After header indicating how long to wait. For full details, see Rate Limits. The patterns in this guide are designed to stay within this limit while maximizing throughput.

Building blocks

The code below separates three concerns: making a single API call with rate-limit handling, running many calls concurrently, and the business logic of what to update. You write the rate-limit and concurrency logic once, then reuse it across every bulk operation you build. The examples use Python with httpx, but the patterns translate to any language.

A rate-limit-aware request

Rate-limited requests (429) are retried after the duration the API specifies. Other errors — including server errors — are returned directly so you can surface them:

import os
import httpx
import asyncio

API_KEY = os.environ["CHECK_API_KEY"]
BASE_URL = os.environ.get("CHECK_BASE_URL", "https://sandbox.checkhq.com")

RATE_LIMITED = 429
BATCH_SIZE = 20


async def request_with_retry(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    *,
    json: dict | None = None,
    max_attempts: int = 3,
) -> httpx.Response:
    for _ in range(max_attempts):
        response = await client.request(method, url, json=json)

        if response.status_code == RATE_LIMITED:
            retry_after = int(response.headers.get("Retry-After", 2))
            await asyncio.sleep(retry_after)
            continue

        return response

    return response  # Final 429 — return it so the caller sees the rate limit

A batched runner

This function manages parallelism. It takes a list of inputs and an operation to apply to each, and runs them in fixed-size batches — each batch runs concurrently, then the next batch starts. The per-request timeout is set tight so one slow response can't hold up an entire batch:

async def run_batch(items: list, operation, *, batch_size: int = BATCH_SIZE):
    results = []

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=10.0,
    ) as client:
        for i in range(0, len(items), batch_size):
            batch = items[i : i + batch_size]
            batch_results = await asyncio.gather(
                *[operation(client, item) for item in batch]
            )
            results.extend(batch_results)

    return results

These two building blocks — rate-limit handling and batched concurrency — are all the infrastructure you need. The rest is business logic.

Pattern: updating many resources

With the building blocks in place, a bulk update becomes a thin function that describes what to do, not how to manage concurrency or rate limits. Each operation identifies which resource it attempted alongside the response, so you can inspect status codes and error bodies however you like:

async def patch_employee(client: httpx.AsyncClient, item: tuple[str, dict]):
    employee_id, payload = item
    response = await request_with_retry(
        client, "PATCH", f"{BASE_URL}/employees/{employee_id}", json=payload,
    )
    return employee_id, response

Example: reassigning employees to new workplaces

async def reassign_employees(assignments: dict[str, str]):
    items = [
        (emp_id, {"workplaces": [workplace_id]})
        for emp_id, workplace_id in assignments.items()
    ]
    return await run_batch(items, patch_employee)


results = asyncio.run(
    reassign_employees({
        "emp_a8Bk2jQ9xLmN4pRsYv0T": "wrk_jK5lN8mQpS3uW7xZbC0f",
        "emp_cD3fG7hJkW1nP5qUzX8b": "wrk_rT6uV9wXyA2bD4eGhI1k",
        "emp_eF6gH9iMoR2sV4tYwA1d": "wrk_mN3oP6qRsU8vW0xYzB5c",
    })
)
📘

The workplaces field on an employee is a list. If an employee works at multiple locations, include all applicable workplace IDs — not just the new one.

Scaling to other resources

Because the rate-limit and concurrency logic are separated out, supporting a new resource type means writing only the operation itself. Here's bulk benefit creation:

async def create_benefit(client: httpx.AsyncClient, item: dict):
    response = await request_with_retry(
        client, "POST", f"{BASE_URL}/employee_benefits", json=item,
    )
    return item.get("employee"), response


# Enroll employees in a 401(k)
employee_ids = ["emp_a8Bk2jQ9xLmN4pRsYv0T", "emp_cD3fG7hJkW1nP5qUzX8b", "emp_eF6gH9iMoR2sV4tYwA1d"]

benefits = [
    {
        "employee": emp_id,
        "company_benefit": "cbn_hJ4kL7mNoQ0rS2tUvW9x",
        "effective_start": "2026-01-01",
        "company_contribution_amount": "200.00",
    }
    for emp_id in employee_ids
]

results = asyncio.run(run_batch(benefits, create_benefit))

Handling results

Every operation returns the resource ID it attempted alongside the API response. Check's error responses include a type field (e.g. validation_error, not_found_error) and may include input_errors with per-field details — use these to give your users specific feedback.

Handle partial failure as a normal case. If you're building a UI where an employer reassigns employees, show something like: "195 of 200 employees updated. 5 failed — click to review."

Summary

  • A rate-limit-aware request wrapper handles 429 retries so your business logic doesn't have to.
  • A batched runner manages concurrency — write one operation per resource type and reuse the rest.
  • Combine list endpoints with batch updates for bulk workflows.