Shopify's Bulk Operations API was built for exactly this. You fire off a single GraphQL request to kick off the operation, Shopify does the heavy lifting in the background, and when it's done it hands you a JSONL file to consume. The catch is that your application needs to manage that job — submitting it, polling its status, and acting on the result. Do that in a cron-driven, MySQL-queued PHP setup and you have something robust enough to run unattended.

This post walks through the PHP methodology for both a bulk mutation (setting inventory levels) and a bulk query (fetching non-taxable variants), with a MySQL job queue and a cron job to keep everything ticking over.

I'm going to assume you already have your Shopify GraphQL delivery layer in place — cURL, saved credentials, access token, the lot. This post is about the orchestration pattern around bulk operations, not the HTTP plumbing.

How Bulk Operations Work

There are two flavours. A bulkOperationRunMutation takes a JSONL file of input objects and runs the same mutation against each one. A bulkOperationRunQuery runs a single query with Shopify's @inContext pagination support across your entire catalogue. Both return a job ID immediately. You then poll currentBulkOperation until the status is COMPLETED, then fetch the JSONL result URL and process it.

The lifecycle looks like this:

1. Submit bulk operation  →  receive operationId
2. Poll currentBulkOperation  →  wait for COMPLETED / FAILED
3. Fetch result JSONL URL
4. Stream & process the JSONL line by line

With a job queue sitting in front of all of this, you can manage multiple stores, retry failures, and have full visibility over what ran and when.

The MySQL Job Queue

First, the queue table. Keep it simple — you don't need a full message broker for this pattern.

CREATE TABLE shopify_bulk_jobs (
    id            INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    shop_domain   VARCHAR(255)  NOT NULL,
    operation_type ENUM('mutation', 'query') NOT NULL,
    job_type      VARCHAR(100)  NOT NULL,          -- e.g. 'inventory_levels', 'non_taxable_variants'
    shopify_bulk_id VARCHAR(255) DEFAULT NULL,    -- gid://shopify/BulkOperation/xxxxx
    input_file_url TEXT          DEFAULT NULL,    -- staged upload URL (mutations only)
    result_url    TEXT          DEFAULT NULL,
    status        ENUM(
                    'pending',     -- waiting to be submitted
                    'submitted',   -- sent to Shopify, awaiting completion
                    'completed',   -- Shopify finished, result URL available
                    'processing', -- we are currently consuming the result
                    'done',        -- fully processed by us
                    'failed'       -- Shopify or our processing errored
                  ) DEFAULT 'pending',
    error_message TEXT          DEFAULT NULL,
    created_at    DATETIME      DEFAULT CURRENT_TIMESTAMP,
    updated_at    DATETIME      DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

Inserting a job is a one-liner from whatever part of your application triggers the operation:

function queue_bulk_job($pdo, $shop_domain, $operation_type, $job_type, $input_file_url = null) {
    $stmt = $pdo->prepare(
        "INSERT INTO shopify_bulk_jobs
            (shop_domain, operation_type, job_type, input_file_url)
         VALUES
            (:shop, :type, :job_type, :input_url)"
    );
    $stmt->execute([
        'shop'       => $shop_domain,
        'type'       => $operation_type,
        'job_type'   => $job_type,
        'input_url'  => $input_file_url
    ]);
    return $pdo->lastInsertId();
}

Example 1: Bulk Mutation — Setting Inventory Levels

For mutations, Shopify requires you to upload a JSONL file via a staged upload first. Each line is a JSON object representing the variables for one mutation call. Here we're setting inventory levels — every line represents one location/item combination.

Step 1: Build and stage the JSONL input

/**
 * Builds a JSONL string for inventory level updates and stages it with Shopify.
 *
 * @param array  $inventory_updates  Array of ['inventory_item_id' => ..., 'location_id' => ..., 'available' => ...]
 * @param string $shop_domain
 * @param string $access_token
 * @return string  The staged upload URL to pass to the bulk mutation
 */
function stage_inventory_jsonl($inventory_updates, $shop_domain, $access_token) {

    ###### 1. Build JSONL content ######
    $lines = [];
    foreach ($inventory_updates as $update) {
        $lines[] = json_encode([
            'inventoryItemId' => 'gid://shopify/InventoryItem/' . basename($update['inventory_item_id']),
            'locationId'      => 'gid://shopify/Location/'      . basename($update['location_id']),
            'available'       => (int)$update['available']
        ]);
    }
    $jsonl = implode("\n", $lines);

    ###### 2. Request a staged upload URL from Shopify ######
    $stage_query = '
        mutation {
            stagedUploadsCreate(input: {
                resource: BULK_MUTATION_VARIABLES,
                filename: "inventory_levels.jsonl",
                mimeType: "text/jsonl",
                httpMethod: PUT
            }) {
                stagedTargets {
                    url
                    resourceUrl
                    parameters { name value }
                }
                userErrors { field message }
            }
        }
    ';

    $stage_response = shopify_graphql_request($shop_domain, $access_token, $stage_query);
    $target = $stage_response['data']['stagedUploadsCreate']['stagedTargets'][0];

    ###### 3. PUT the JSONL file to the staged URL ######
    shopify_put_staged_file($target['url'], $jsonl);

    return $target['resourceUrl']; // this is what goes into the bulk mutation
}

Step 2: Submit the bulk mutation

/**
 * Submits a bulkOperationRunMutation to set inventory levels.
 *
 * @param string $staged_url   The resourceUrl returned by stagedUploadsCreate
 * @param string $shop_domain
 * @param string $access_token
 * @return string  The Shopify bulk operation GID
 */
function submit_inventory_bulk_mutation($staged_url, $shop_domain, $access_token) {

    $mutation = '
        mutation bulkOperationRunMutation(
            $mutation: String!,
            $stagedUploadPath: String!
        ) {
            bulkOperationRunMutation(
                mutation: $mutation,
                stagedUploadPath: $stagedUploadPath
            ) {
                bulkOperation {
                    id
                    status
                }
                userErrors { field message }
            }
        }
    ';

    $inventory_mutation_string = '
        mutation inventorySetOnHandQuantities($input: InventorySetOnHandQuantitiesInput!) {
            inventorySetOnHandQuantities(input: $input) {
                inventoryAdjustmentGroup { id }
                userErrors { field message }
            }
        }
    ';

    $variables = [
        'mutation'          => $inventory_mutation_string,
        'stagedUploadPath'  => $staged_url
    ];

    $response = shopify_graphql_request($shop_domain, $access_token, $mutation, $variables);

    return $response['data']['bulkOperationRunMutation']['bulkOperation']['id'];
}

Example 2: Bulk Query — Fetching Non-Taxable Variants

Bulk queries are simpler — no staging required. You write a standard GraphQL query using the @inContext pagination hint and Shopify iterates the entire dataset for you.

/**
 * Submits a bulk query to retrieve all product variants where taxable = false.
 *
 * @param string $shop_domain
 * @param string $access_token
 * @return string  The Shopify bulk operation GID
 */
function submit_non_taxable_variants_bulk_query($shop_domain, $access_token) {

    $query = '
        mutation {
            bulkOperationRunQuery(
                query: """
                    {
                        productVariants(query: "taxable:false") {
                            edges {
                                node {
                                    id
                                    sku
                                    taxable
                                    price
                                    product {
                                        id
                                        title
                                    }
                                }
                            }
                        }
                    }
                """
            ) {
                bulkOperation {
                    id
                    status
                }
                userErrors { field message }
            }
        }
    ';

    $response = shopify_graphql_request($shop_domain, $access_token, $query);

    return $response['data']['bulkOperationRunQuery']['bulkOperation']['id'];
}
Note that Shopify only allows one bulk operation per store at a time. If your queue has multiple pending jobs for the same shop, you need to hold off submitting the next one until the first has completed. The queue status column handles this cleanly — only submit a new job when the previous submitted one reaches done.

The Cron Job: Polling and Processing

This is the engine of the whole setup. A cron job fires every few minutes, looks at the queue, submits pending jobs, polls submitted ones, and processes any that have completed. Keep each run fast — do one unit of work per job per run and let the next cron tick move things forward.

#!/usr/bin/env php
/**
 * cron_bulk_jobs.php
 * Run this every 2–5 minutes via crontab:
 *   *\/3 * * * * /usr/bin/php /var/www/app/cron/cron_bulk_jobs.php >> /var/log/shopify_bulk.log 2>&1
 */

require_once __DIR__ . '/../bootstrap.php';

$pdo = get_db_connection();

###### 1. Submit any pending jobs (one per shop at a time) ######

$pending = $pdo->query(
    "SELECT sbj.*
       FROM shopify_bulk_jobs sbj
      WHERE sbj.status = 'pending'
        AND NOT EXISTS (
            SELECT 1 FROM shopify_bulk_jobs active
             WHERE active.shop_domain  = sbj.shop_domain
               AND active.status = 'submitted'
        )
      ORDER BY sbj.created_at ASC"
)->fetchAll(PDO::FETCH_ASSOC);

foreach ($pending as $job) {
    $creds = get_shop_credentials($job['shop_domain']);

    try {
        if ($job['operation_type'] === 'mutation') {
            $bulk_id = submit_inventory_bulk_mutation(
                $job['input_file_url'],
                $job['shop_domain'],
                $creds['access_token']
            );
        } else {
            $bulk_id = submit_non_taxable_variants_bulk_query(
                $job['shop_domain'],
                $creds['access_token']
            );
        }

        $stmt = $pdo->prepare(
            "UPDATE shopify_bulk_jobs
                SET status = 'submitted', shopify_bulk_id = :bulk_id
              WHERE id = :id"
        );
        $stmt->execute(['bulk_id' => $bulk_id, 'id' => $job['id']]);
        echo "[INFO] Submitted job #{$job['id']} ({$job['job_type']}) for {$job['shop_domain']}\n";

    } catch (Exception $e) {
        mark_job_failed($pdo, $job['id'], $e->getMessage());
    }
}

###### 2. Poll submitted jobs ######

$submitted = $pdo->query(
    "SELECT * FROM shopify_bulk_jobs WHERE status = 'submitted'"
)->fetchAll(PDO::FETCH_ASSOC);

foreach ($submitted as $job) {
    $creds  = get_shop_credentials($job['shop_domain']);
    $status = poll_bulk_operation($job['shopify_bulk_id'], $job['shop_domain'], $creds['access_token']);

    if ($status['status'] === 'COMPLETED') {
        $stmt = $pdo->prepare(
            "UPDATE shopify_bulk_jobs
                SET status = 'completed', result_url = :url
              WHERE id = :id"
        );
        $stmt->execute(['url' => $status['url'], 'id' => $job['id']]);

    } elseif ($status['status'] === 'FAILED') {
        mark_job_failed($pdo, $job['id'], 'Shopify reported FAILED status');
    }
    // RUNNING / CANCELING: do nothing, wait for next cron tick
}

###### 3. Process completed jobs ######

$completed = $pdo->query(
    "SELECT * FROM shopify_bulk_jobs WHERE status = 'completed'"
)->fetchAll(PDO::FETCH_ASSOC);

foreach ($completed as $job) {
    $pdo->prepare("UPDATE shopify_bulk_jobs SET status='processing' WHERE id=:id")
        ->execute(['id' => $job['id']]);

    try {
        process_bulk_result($job);
        $pdo->prepare("UPDATE shopify_bulk_jobs SET status='done' WHERE id=:id")
            ->execute(['id' => $job['id']]);
        echo "[INFO] Job #{$job['id']} ({$job['job_type']}) marked done.\n";

    } catch (Exception $e) {
        mark_job_failed($pdo, $job['id'], $e->getMessage());
    }
}

The poll function

/**
 * Checks the current status of a Shopify bulk operation.
 *
 * @return array ['status' => 'RUNNING|COMPLETED|FAILED|...', 'url' => string|null]
 */
function poll_bulk_operation($bulk_id, $shop_domain, $access_token) {
    $query = '
        query {
            currentBulkOperation {
                id
                status
                errorCode
                url
                objectCount
            }
        }
    ';

    $response = shopify_graphql_request($shop_domain, $access_token, $query);
    $op       = $response['data']['currentBulkOperation'];

    // Guard: make sure we're polling the right job
    if (empty($op) || $op['id'] !== $bulk_id) {
        return ['status' => 'UNKNOWN', 'url' => null];
    }

    return [
        'status' => $op['status'],
        'url'    => $op['url'] ?? null
    ];
}

Processing the JSONL result

Once Shopify gives you the result URL, stream the JSONL file line by line — don't try to load the whole thing into memory if you're dealing with tens of thousands of variants.

/**
 * Streams and processes the JSONL result for a completed bulk job.
 */
function process_bulk_result($job) {
    $handle = fopen($job['result_url'], 'r');
    if (!$handle) {
        throw new RuntimeException("Could not open result URL for job #{$job['id']}");
    }

    while (($line = fgets($handle)) !== false) {
        $record = json_decode(trim($line), true);
        if (empty($record)) continue;

        switch ($job['job_type']) {
            case 'inventory_levels':
                handle_inventory_result_record($record);
                break;
            case 'non_taxable_variants':
                handle_non_taxable_variant_record($record);
                break;
        }
    }

    fclose($handle);
}

function handle_non_taxable_variant_record($record) {
    // Each $record is one variant node from the bulk query result.
    // Do whatever you need — log it, sync to your own DB, flag for review.
    error_log("Non-taxable variant: {$record['sku']} on product {$record['__parentId']}");
}

function handle_inventory_result_record($record) {
    // Mutation result records contain the response from each individual mutation call.
    // Check for userErrors here and handle accordingly.
    if (!empty($record['userErrors'])) {
        error_log("Inventory mutation error: " . json_encode($record['userErrors']));
    }
}

function mark_job_failed($pdo, $job_id, $message) {
    $pdo->prepare(
        "UPDATE shopify_bulk_jobs SET status='failed', error_message=:msg WHERE id=:id"
    )->execute(['msg' => $message, 'id' => $job_id]);
    error_log("[ERROR] Job #{$job_id} failed: {$message}");
}

A Note on the Crontab Entry

Set the cron to run every few minutes — every 2 or 3 works well in practice. Shopify's smaller bulk operations often complete within a minute; larger ones with hundreds of thousands of records can take 10–15 minutes, and the queue just waits patiently for the next poll.

# Run the bulk job processor every 3 minutes
*/3 * * * * /usr/bin/php /var/www/app/cron/cron_bulk_jobs.php >> /var/log/shopify_bulk.log 2>&1

One important thing: add a check at the top of your cron script to bail out if a previous instance is still running. A lock file or a processing status check on the queue is enough. You don't want two cron processes trying to move the same job forward at the same time.

Wrapping Up

The pattern here is intentionally straightforward. The MySQL queue gives you a full audit trail of every bulk job — when it was submitted, what Shopify gave back, whether it succeeded. The cron job is stateless and simple; all the state lives in the database. And because you're streaming the JSONL result line by line, memory consumption stays flat regardless of how many variants or inventory records you're dealing with.

The mutation and query sides are deliberately kept separate. Mutations need the staged upload step; queries don't. Both end up in the same queue and follow the same lifecycle, which makes the cron logic clean and easy to extend with new job types.

Scale this out to multiple stores by ensuring the one-at-a-time constraint is enforced per shop domain — which the NOT EXISTS clause in the pending query already does — and you have a solid foundation for any high-volume Shopify integration.