Shopify's Bulk Operations API was built for exactly this. You fire off a single GraphQL request to kick off the operation, Shopify does the heavy lifting in the background, and when it's done it hands you a JSONL file to consume. The catch is that your application needs to manage that job — submitting it, polling its status, and acting on the result. Do that in a cron-driven, MySQL-queued PHP setup and you have something robust enough to run unattended.
This post walks through the PHP methodology for both a bulk mutation (setting inventory levels) and a bulk query (fetching non-taxable variants), with a MySQL job queue and a cron job to keep everything ticking over.
How Bulk Operations Work
There are two flavours. A bulkOperationRunMutation takes a JSONL file of input objects and runs the same mutation against each one. A bulkOperationRunQuery runs a single query with Shopify's @inContext pagination support across your entire catalogue. Both return a job ID immediately. You then poll currentBulkOperation until the status is COMPLETED, then fetch the JSONL result URL and process it.
The lifecycle looks like this:
1. Submit bulk operation → receive operationId 2. Poll currentBulkOperation → wait for COMPLETED / FAILED 3. Fetch result JSONL URL 4. Stream & process the JSONL line by line
With a job queue sitting in front of all of this, you can manage multiple stores, retry failures, and have full visibility over what ran and when.
The MySQL Job Queue
First, the queue table. Keep it simple — you don't need a full message broker for this pattern.
CREATE TABLE shopify_bulk_jobs ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, shop_domain VARCHAR(255) NOT NULL, operation_type ENUM('mutation', 'query') NOT NULL, job_type VARCHAR(100) NOT NULL, -- e.g. 'inventory_levels', 'non_taxable_variants' shopify_bulk_id VARCHAR(255) DEFAULT NULL, -- gid://shopify/BulkOperation/xxxxx input_file_url TEXT DEFAULT NULL, -- staged upload URL (mutations only) result_url TEXT DEFAULT NULL, status ENUM( 'pending', -- waiting to be submitted 'submitted', -- sent to Shopify, awaiting completion 'completed', -- Shopify finished, result URL available 'processing', -- we are currently consuming the result 'done', -- fully processed by us 'failed' -- Shopify or our processing errored ) DEFAULT 'pending', error_message TEXT DEFAULT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
Inserting a job is a one-liner from whatever part of your application triggers the operation:
function queue_bulk_job($pdo, $shop_domain, $operation_type, $job_type, $input_file_url = null) { $stmt = $pdo->prepare( "INSERT INTO shopify_bulk_jobs (shop_domain, operation_type, job_type, input_file_url) VALUES (:shop, :type, :job_type, :input_url)" ); $stmt->execute([ 'shop' => $shop_domain, 'type' => $operation_type, 'job_type' => $job_type, 'input_url' => $input_file_url ]); return $pdo->lastInsertId(); }
Example 1: Bulk Mutation — Setting Inventory Levels
For mutations, Shopify requires you to upload a JSONL file via a staged upload first. Each line is a JSON object representing the variables for one mutation call. Here we're setting inventory levels — every line represents one location/item combination.
Step 1: Build and stage the JSONL input
/** * Builds a JSONL string for inventory level updates and stages it with Shopify. * * @param array $inventory_updates Array of ['inventory_item_id' => ..., 'location_id' => ..., 'available' => ...] * @param string $shop_domain * @param string $access_token * @return string The staged upload URL to pass to the bulk mutation */ function stage_inventory_jsonl($inventory_updates, $shop_domain, $access_token) { ###### 1. Build JSONL content ###### $lines = []; foreach ($inventory_updates as $update) { $lines[] = json_encode([ 'inventoryItemId' => 'gid://shopify/InventoryItem/' . basename($update['inventory_item_id']), 'locationId' => 'gid://shopify/Location/' . basename($update['location_id']), 'available' => (int)$update['available'] ]); } $jsonl = implode("\n", $lines); ###### 2. Request a staged upload URL from Shopify ###### $stage_query = ' mutation { stagedUploadsCreate(input: { resource: BULK_MUTATION_VARIABLES, filename: "inventory_levels.jsonl", mimeType: "text/jsonl", httpMethod: PUT }) { stagedTargets { url resourceUrl parameters { name value } } userErrors { field message } } } '; $stage_response = shopify_graphql_request($shop_domain, $access_token, $stage_query); $target = $stage_response['data']['stagedUploadsCreate']['stagedTargets'][0]; ###### 3. PUT the JSONL file to the staged URL ###### shopify_put_staged_file($target['url'], $jsonl); return $target['resourceUrl']; // this is what goes into the bulk mutation }
Step 2: Submit the bulk mutation
/** * Submits a bulkOperationRunMutation to set inventory levels. * * @param string $staged_url The resourceUrl returned by stagedUploadsCreate * @param string $shop_domain * @param string $access_token * @return string The Shopify bulk operation GID */ function submit_inventory_bulk_mutation($staged_url, $shop_domain, $access_token) { $mutation = ' mutation bulkOperationRunMutation( $mutation: String!, $stagedUploadPath: String! ) { bulkOperationRunMutation( mutation: $mutation, stagedUploadPath: $stagedUploadPath ) { bulkOperation { id status } userErrors { field message } } } '; $inventory_mutation_string = ' mutation inventorySetOnHandQuantities($input: InventorySetOnHandQuantitiesInput!) { inventorySetOnHandQuantities(input: $input) { inventoryAdjustmentGroup { id } userErrors { field message } } } '; $variables = [ 'mutation' => $inventory_mutation_string, 'stagedUploadPath' => $staged_url ]; $response = shopify_graphql_request($shop_domain, $access_token, $mutation, $variables); return $response['data']['bulkOperationRunMutation']['bulkOperation']['id']; }
Example 2: Bulk Query — Fetching Non-Taxable Variants
Bulk queries are simpler — no staging required. You write a standard GraphQL query using the @inContext pagination hint and Shopify iterates the entire dataset for you.
/** * Submits a bulk query to retrieve all product variants where taxable = false. * * @param string $shop_domain * @param string $access_token * @return string The Shopify bulk operation GID */ function submit_non_taxable_variants_bulk_query($shop_domain, $access_token) { $query = ' mutation { bulkOperationRunQuery( query: """ { productVariants(query: "taxable:false") { edges { node { id sku taxable price product { id title } } } } } """ ) { bulkOperation { id status } userErrors { field message } } } '; $response = shopify_graphql_request($shop_domain, $access_token, $query); return $response['data']['bulkOperationRunQuery']['bulkOperation']['id']; }
done.The Cron Job: Polling and Processing
This is the engine of the whole setup. A cron job fires every few minutes, looks at the queue, submits pending jobs, polls submitted ones, and processes any that have completed. Keep each run fast — do one unit of work per job per run and let the next cron tick move things forward.
#!/usr/bin/env php /** * cron_bulk_jobs.php * Run this every 2–5 minutes via crontab: * *\/3 * * * * /usr/bin/php /var/www/app/cron/cron_bulk_jobs.php >> /var/log/shopify_bulk.log 2>&1 */ require_once __DIR__ . '/../bootstrap.php'; $pdo = get_db_connection(); ###### 1. Submit any pending jobs (one per shop at a time) ###### $pending = $pdo->query( "SELECT sbj.* FROM shopify_bulk_jobs sbj WHERE sbj.status = 'pending' AND NOT EXISTS ( SELECT 1 FROM shopify_bulk_jobs active WHERE active.shop_domain = sbj.shop_domain AND active.status = 'submitted' ) ORDER BY sbj.created_at ASC" )->fetchAll(PDO::FETCH_ASSOC); foreach ($pending as $job) { $creds = get_shop_credentials($job['shop_domain']); try { if ($job['operation_type'] === 'mutation') { $bulk_id = submit_inventory_bulk_mutation( $job['input_file_url'], $job['shop_domain'], $creds['access_token'] ); } else { $bulk_id = submit_non_taxable_variants_bulk_query( $job['shop_domain'], $creds['access_token'] ); } $stmt = $pdo->prepare( "UPDATE shopify_bulk_jobs SET status = 'submitted', shopify_bulk_id = :bulk_id WHERE id = :id" ); $stmt->execute(['bulk_id' => $bulk_id, 'id' => $job['id']]); echo "[INFO] Submitted job #{$job['id']} ({$job['job_type']}) for {$job['shop_domain']}\n"; } catch (Exception $e) { mark_job_failed($pdo, $job['id'], $e->getMessage()); } } ###### 2. Poll submitted jobs ###### $submitted = $pdo->query( "SELECT * FROM shopify_bulk_jobs WHERE status = 'submitted'" )->fetchAll(PDO::FETCH_ASSOC); foreach ($submitted as $job) { $creds = get_shop_credentials($job['shop_domain']); $status = poll_bulk_operation($job['shopify_bulk_id'], $job['shop_domain'], $creds['access_token']); if ($status['status'] === 'COMPLETED') { $stmt = $pdo->prepare( "UPDATE shopify_bulk_jobs SET status = 'completed', result_url = :url WHERE id = :id" ); $stmt->execute(['url' => $status['url'], 'id' => $job['id']]); } elseif ($status['status'] === 'FAILED') { mark_job_failed($pdo, $job['id'], 'Shopify reported FAILED status'); } // RUNNING / CANCELING: do nothing, wait for next cron tick } ###### 3. Process completed jobs ###### $completed = $pdo->query( "SELECT * FROM shopify_bulk_jobs WHERE status = 'completed'" )->fetchAll(PDO::FETCH_ASSOC); foreach ($completed as $job) { $pdo->prepare("UPDATE shopify_bulk_jobs SET status='processing' WHERE id=:id") ->execute(['id' => $job['id']]); try { process_bulk_result($job); $pdo->prepare("UPDATE shopify_bulk_jobs SET status='done' WHERE id=:id") ->execute(['id' => $job['id']]); echo "[INFO] Job #{$job['id']} ({$job['job_type']}) marked done.\n"; } catch (Exception $e) { mark_job_failed($pdo, $job['id'], $e->getMessage()); } }
The poll function
/** * Checks the current status of a Shopify bulk operation. * * @return array ['status' => 'RUNNING|COMPLETED|FAILED|...', 'url' => string|null] */ function poll_bulk_operation($bulk_id, $shop_domain, $access_token) { $query = ' query { currentBulkOperation { id status errorCode url objectCount } } '; $response = shopify_graphql_request($shop_domain, $access_token, $query); $op = $response['data']['currentBulkOperation']; // Guard: make sure we're polling the right job if (empty($op) || $op['id'] !== $bulk_id) { return ['status' => 'UNKNOWN', 'url' => null]; } return [ 'status' => $op['status'], 'url' => $op['url'] ?? null ]; }
Processing the JSONL result
Once Shopify gives you the result URL, stream the JSONL file line by line — don't try to load the whole thing into memory if you're dealing with tens of thousands of variants.
/** * Streams and processes the JSONL result for a completed bulk job. */ function process_bulk_result($job) { $handle = fopen($job['result_url'], 'r'); if (!$handle) { throw new RuntimeException("Could not open result URL for job #{$job['id']}"); } while (($line = fgets($handle)) !== false) { $record = json_decode(trim($line), true); if (empty($record)) continue; switch ($job['job_type']) { case 'inventory_levels': handle_inventory_result_record($record); break; case 'non_taxable_variants': handle_non_taxable_variant_record($record); break; } } fclose($handle); } function handle_non_taxable_variant_record($record) { // Each $record is one variant node from the bulk query result. // Do whatever you need — log it, sync to your own DB, flag for review. error_log("Non-taxable variant: {$record['sku']} on product {$record['__parentId']}"); } function handle_inventory_result_record($record) { // Mutation result records contain the response from each individual mutation call. // Check for userErrors here and handle accordingly. if (!empty($record['userErrors'])) { error_log("Inventory mutation error: " . json_encode($record['userErrors'])); } } function mark_job_failed($pdo, $job_id, $message) { $pdo->prepare( "UPDATE shopify_bulk_jobs SET status='failed', error_message=:msg WHERE id=:id" )->execute(['msg' => $message, 'id' => $job_id]); error_log("[ERROR] Job #{$job_id} failed: {$message}"); }
A Note on the Crontab Entry
Set the cron to run every few minutes — every 2 or 3 works well in practice. Shopify's smaller bulk operations often complete within a minute; larger ones with hundreds of thousands of records can take 10–15 minutes, and the queue just waits patiently for the next poll.
# Run the bulk job processor every 3 minutes */3 * * * * /usr/bin/php /var/www/app/cron/cron_bulk_jobs.php >> /var/log/shopify_bulk.log 2>&1
One important thing: add a check at the top of your cron script to bail out if a previous instance is still running. A lock file or a processing status check on the queue is enough. You don't want two cron processes trying to move the same job forward at the same time.
Wrapping Up
The pattern here is intentionally straightforward. The MySQL queue gives you a full audit trail of every bulk job — when it was submitted, what Shopify gave back, whether it succeeded. The cron job is stateless and simple; all the state lives in the database. And because you're streaming the JSONL result line by line, memory consumption stays flat regardless of how many variants or inventory records you're dealing with.
The mutation and query sides are deliberately kept separate. Mutations need the staged upload step; queries don't. Both end up in the same queue and follow the same lifecycle, which makes the cron logic clean and easy to extend with new job types.
Scale this out to multiple stores by ensuring the one-at-a-time constraint is enforced per shop domain — which the NOT EXISTS clause in the pending query already does — and you have a solid foundation for any high-volume Shopify integration.