Skip to main content

Collections — low-level async API

tip

For most use cases prefer the higher-level Batch API (submit_batch()) — it handles collection, run, jobs, and result fetching in one object with streaming results, progress tracking, and callbacks.

Use this page when you need manual control over the collection/run/job lifecycle (e.g. reusing a collection across multiple runs, custom polling, inspecting state).

For very large batches (1,000+ URLs) or when you need different parameters per URL, use collections with server-side execution.

Create and run

# Create a collection — use custom_id to map jobs to your data
col = client.create_collection("my-batch", [
{"url": "https://example.com/tour/1", "custom_id": "tour_1"},
{"url": "https://example.com/tour/2", "custom_id": "tour_2", "browser": True},
{"url": "https://example.com/tour/3", "custom_id": "tour_3", "format": "markdown"},
])

# Run and wait for completion
run = client.run_and_wait(col.id, timeout=300)

# Stream jobs lazily — each job has url + custom_id for traceability
for job in client.iter_run_jobs(col.id, run.run_id):
if job.status == "completed":
result = client.get_job_result(col.id, run.run_id, job.job_public_id)
save(my_id=job.custom_id, content=result.content)

URL traceability with custom_id

Each request can carry an arbitrary custom_id string that the API echoes back in JobExecutionPublic.custom_id and ScrapeResponse.custom_id. Use it to map results to your database without depending on order.

# Submit
col = client.create_collection("reviews-daily", [
{"url": tour["url"], "custom_id": tour["id"]} for tour in my_tours
])

# Map results
run = client.run_and_wait(col.id)
results_by_id = {}
for job in client.iter_run_jobs(col.id, run.run_id):
if job.status == "completed":
result = client.get_job_result(col.id, run.run_id, job.job_public_id)
results_by_id[job.custom_id] = result

You can submit the same URL with different custom_id values (deduped by (url, custom_id)) — useful for re-scraping with different processing pipelines.

Iterating jobs

Three ways depending on the size of your batch:

# 1. Stream lazily (recommended for >500 jobs)
for job in client.iter_run_jobs(col.id, run.run_id):
process(job)

# 2. Filter by status
for job in client.iter_run_jobs(col.id, run.run_id, status_filter="completed"):
process(job)

# 3. Fetch all into memory (default behavior, paginates internally)
jobs = client.get_run_jobs(col.id, run.run_id)
print(f"{len(jobs.items)} jobs total")

# 4. Manual pagination
page = client.get_run_jobs(col.id, run.run_id, cursor=None, limit=500)
while page.has_more:
for job in page.items:
process(job)
page = client.get_run_jobs(col.id, run.run_id, cursor=page.cursor_next, limit=500)

Rolling your own polling loop

If you're polling a still-running collection with a custom loop wrapped around client.iter_run_jobs() — typical pattern for code written before v0.7.x — there's an important caveat: the polling-efficiency optimisations added in v0.7.3 and v0.7.4 live in the high-level helpers, not at the wire layer. A custom loop bypasses them.

What you give up by keeping a custom polling loop:

  • Adaptive poll_interval (v0.7.3) — the SDK picks 5 / 10 / 15 / 30 s based on batch size so large runs don't burn the rate budget. You're hardcoding a value.
  • Counter short-circuit (v0.7.4) — skipping the jobs-page query when run.success_requests + failed_requests + timeout_requests hasn't moved since the previous tick. Cuts ~50% of polling requests on long-running batches.
  • Resilience: transient 5xx / 429 retry, parallel result fetching, progress tracking.

If your pattern is "iterate completed jobs, fetch each result, process", that's exactly what client.iter_results(cid, rid) does — and it carries every optimisation listed above. Practically a drop-in:

# Before — custom loop, no optimisations:
batch = client.get_batch(cid, rid)
while not batch.is_finished:
batch.refresh()
for job in client.iter_run_jobs(cid, rid, status_filter="completed"):
result = client.get_job_result(cid, rid, job.job_public_id)
process(result, job.custom_id)
time.sleep(5)

# After — high-level, optimisations baked in:
for result in client.iter_results(cid, rid):
process(result, result.custom_id)

Works for reattach too (the typical reason to be using get_batch + iter_run_jobs separately):

# After a crash / restart, persisted (cid, rid) → just iterate:
for result in client.iter_results(saved_cid, saved_rid):
process(result, result.custom_id)

Pass submitted_count=N if you persisted len(payload) and want batch.summary() to report a complete picture after the loop exits (see Batch API → Reattaching).

If you can't migrate: replicate the pattern manually

Legitimate reasons to keep the custom loop exist — integration with your own scheduler, fine-grained per-job retry logic, side effects on the polling cadence itself. The recipe below replicates the v0.7.4 optimisations in caller code:

import time
from scrapingpros import adaptive_poll_interval

last_terminal = -1 # sentinel — first tick always fetches
last_completed_at = None # high-water mark for incremental iter_run_jobs

while True:
run = client.get_run(cid, rid)

# Counter short-circuit: skip the jobs-page query when nothing
# has moved server-side since the previous tick.
current_terminal = (
(run.success_requests or 0)
+ (run.failed_requests or 0)
+ (run.timeout_requests or 0)
)
if current_terminal != last_terminal and run.all_jobs_persisted is not False:
for job in client.iter_run_jobs(
cid, rid,
status_filter=["completed", "failed", "timeout"], # v0.7.6: list/CSV
since_completed_at=last_completed_at,
):
if job.status == "completed":
result = client.get_job_result(cid, rid, job.job_public_id)
process(result, job.custom_id)
else:
process_failure(job)
if job.completed_at is not None:
last_completed_at = job.completed_at
last_terminal = current_terminal

if run.status in ("completed", "failed", "cancelled"):
break

# Adaptive cadence sized to the batch — small batches stay
# responsive, long batches don't saturate the rate limit.
time.sleep(adaptive_poll_interval(run.total_requests))

adaptive_poll_interval(n, kind="jobs") is exported from the top-level package since v0.7.3; kind="status" returns the tighter table for status-only polling. The recipe above uses "jobs" because each tick that fires the inner loop is jobs-page-heavy.

iter_run_jobs accepts since_completed_at= so the inner loop only paginates jobs that completed after the previous tick's high-water mark — that's how Batch.iter_results avoids re-reading every job on every tick. Since v0.7.6 status_filter also accepts a list or CSV string, so a single paginated stream drains all three terminal states (one request instead of three). The all_jobs_persisted guard (v0.7.6+) skips the inner drain while the server is still seeding jobs — Batch.iter_results does this automatically.

JobExecutionPublic fields

Every job in iter_run_jobs() / get_run_jobs() contains:

FieldDescription
job_public_idUnique job ID, used to fetch the result
urlThe URL that was scraped
custom_idYour traceability ID (echoed back from the request)
status"processing", "completed", "failed", "timeout"
status_codeHTTP status from the target site
is_successServer verdict of whether the job produced usable content — True / False / None (legacy)
queued_at, started_at, completed_atLifecycle timestamps (datetime)
execution_time_msTotal execution time in milliseconds
retries_attemptedInternal retry count
block_reasonWhy the job was flagged as blocked (if any)
protection_stackDetected protections (e.g. ["cloudflare", "datadome"])
rule_hitsValidator rules that matched
has_extractable_dataTrue / False / None — whether the page contained structured data (JSON-LD, microdata, OpenGraph, __NEXT_DATA__). Independent of is_success. (v0.5.0+)
validator_versionVersion of the HTML Validator that produced is_success and friends. Pin in tests to detect classifier upgrades. (v0.5.0+)
client_idClient account that owns the job. (v0.5.0+)
url_truncatedTrue if the URL was longer than 2048 chars and got truncated

Use is_success — don't re-implement the check

is_success is the server's authoritative verdict — the same one used to compute run.success_requests. Prefer it over writing your own check on status_code + body size: the server catches soft-blocks (Google CAPTCHA pages with 200 + large body, Amazon "Robot Check", etc.) that a simple heuristic misses.

for job in client.iter_run_jobs(col.id, run.run_id):
if job.is_success:
result = client.get_job_result(col.id, run.run_id, job.job_public_id)
save(result)
else:
log_failed(job.url, reason=job.block_reason or f"http_{job.status_code}")

The Batch.iter_results() API already honors this internally — result.guidance.success reflects job.is_success.

Success criterion (policy pinning)

Each run carries the classification policy as metadata. Pin the version in integration tests to catch silent policy changes:

run = client.get_run(col.id, run.run_id)
assert run.success_criterion.version == "content_success_v1"

The current policy content_success_v1 classifies a job as success when all of these hold:

  • status == "completed"
  • 200 <= status_code < 300
  • potentiallyBlockedByCaptcha is false
  • block_reason is null or "none"

If the policy ever changes, the version bumps (e.g. v2) and your pinned test fails loudly.

Webhooks

Get notified when a batch completes instead of polling:

col = client.create_collection(
"my-batch",
[{"url": "https://example.com/1"}, {"url": "https://example.com/2"}],
callback_url="https://your-server.com/webhook",
)
run = client.create_run(col.id)

# Your server receives a POST when done:
# {"event": "run.completed", "run_id": "...", "job_ids": [...]}
# Signed with HMAC-SHA256 in X-SP-Signature header

Check delivery status:

run = client.get_run(col.id, run.run_id)
print(run.callback_status) # "sent", "pending", "failed", "retrying"

Manual polling

import time

run = client.create_run(col.id)
while True:
run = client.get_run(col.id, run.run_id)
if run.status in ("completed", "failed", "cancelled"):
break
print(f"Progress: {run.success_requests}/{run.total_requests}")
time.sleep(5)

Collection management

# List all collections
collections = client.list_collections()

# Get a specific collection
col = client.get_collection("collection-id")

# Update a collection
client.update_collection("collection-id", "new-name", [
{"url": "https://example.com/updated"},
])

# Delete (runs are not affected)
client.delete_collection("collection-id")

Retention

  • Job metadata (status, timings, custom_id, URL) — retained 90 days
  • HTML / markdown / extracted_data — available for 48 hours after job completion

For longer-term archival, save the result on your side immediately after fetching.