Mass Update Product Rate Plan Charge Tax Code

Edited

System: Zuora

Action: Mass Update the Product Catalog

Description: The goal of this is to provide a means of mass updating the product rate plan charge tax code field in Zuora for a certain number of charges. This request happens frequently for customers with a large number of rate plan charges.

This can be accomplished with a number of tools, including:

For simplicity, we are including the details of using a Python script that executes from a file placed on your desktop.

While each company's business systems team is organized differently, If you need business users, such as the accounting team or revops team to execute the script at their leisure, CodePulse may be the best option for you.

Use Case

Update the tax code field on a product rate plan charge in Zuora. The tax codes will be in a csv file in a folder on a computer (e.g. your desktop). There will be two columns in the csv file: the id of the product rate plan charge and the tax code.

It is best to test this script and your changes in a Sandbox environment prior to any production changes.

This script does the following:

  1. It uses the requests library to interact with the Zuora API.

  2. The update_tax_code function handles updating a single product rate plan charge.

  3. The main function reads the CSV file and processes the updates in batches.

  4. It uses a ThreadPoolExecutor to perform multiple updates concurrently, which helps speed up the process while still managing resource usage.

  5. The script processes updates in batches of 50 (adjustable) to avoid overwhelming your computer or the API.

  6. After processing all updates, the script writes any failures to a new CSV file named "failed_updates.csv" on your desktop (you can change this path as needed).

  7. There's a small delay between batches to further manage resource usage.

To use this script:

  1. Replace YOUR_API_TOKEN_HERE with your actual Zuora API token.

  2. Update the input_csv_path variable with the correct path to your input CSV file.

  3. Update the output_csv_path variable if you want the failure log saved somewhere other than your desktop.

  4. Adjust the batch_size if needed, based on your computer's performance.

  5. Make sure you have the requests library installed (pip install requests) before running the script.

Python Code

import csv
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

# Zuora API configuration
BASE_URL = "https://rest.apisandbox.zuora.com/v1"
HEADERS = {
    "Content-Type": "application/json",
    "Accept": "application/json",
    "Authorization": "Bearer YOUR_API_TOKEN_HERE"  # Replace with your actual API token
}

# Function to update tax code for a single product rate plan charge
def update_tax_code(charge_id, tax_code):
    url = f"{BASE_URL}/object/product-rate-plan-charge/{charge_id}"
    payload = {"TaxCode": tax_code}
    
    try:
        response = requests.put(url, json=payload, headers=HEADERS)
        response.raise_for_status()
        print(f"Successfully updated tax code for charge ID: {charge_id}")
        return None  # Indicates success
    except requests.exceptions.RequestException as e:
        error_message = f"Failed to update tax code for charge ID: {charge_id}. Error: {str(e)}"
        print(error_message)
        return charge_id, tax_code, error_message  # Return failure info

# Main function to process the CSV and update tax codes
def main():
    input_csv_path = "/Users/YourUsername/Desktop/tax_codes.csv"  # Replace with your actual path
    output_csv_path = "/Users/YourUsername/Desktop/failed_updates.csv"  # Replace with your desired output path
    batch_size = 50  # Adjust this based on your computer's performance
    
    with open(input_csv_path, 'r') as csvfile:
        reader = csv.reader(csvfile)
        next(reader)  # Skip header row if present
        updates = [(row[0], row[1]) for row in reader]
    
    total_updates = len(updates)
    processed = 0
    failures = []
    
    print(f"Total updates to process: {total_updates}")
    
    # Process updates in batches
    for i in range(0, total_updates, batch_size):
        batch = updates[i:i+batch_size]
        
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(update_tax_code, charge_id, tax_code) for charge_id, tax_code in batch]
            
            for future in as_completed(futures):
                result = future.result()
                if result:  # This means there was a failure
                    failures.append(result)
        
        processed += len(batch)
        print(f"Processed {processed}/{total_updates} updates")
        
        # Add a small delay between batches to avoid overwhelming the API
        time.sleep(2)
    
    # Write failures to CSV file
    if failures:
        with open(output_csv_path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Charge ID', 'Tax Code', 'Error Message', 'Timestamp'])
            for failure in failures:
                writer.writerow([*failure, datetime.now().isoformat()])
        
        print(f"Wrote {len(failures)} failed updates to {output_csv_path}")
    else:
        print("All updates were successful!")

if __name__ == "__main__":
    main()