Skip to content

Plotting Average Price Data

Source: Notion | Last edited: 2023-08-09 | ID: c9095a24-883...


Table of contents

from rich import print from rich.traceback import insta…

how to plot the aveage?

from rich import print from rich.traceback import install import requests from collections import defaultdict from datetime import datetime import json

def load_config(filename=“config.json”): ''' config.json file contains the following: { “api_key”: “key_string_here” } '''
with open(filename, ‘r’) as file: config = json.load(file) return config

config = load_config() api_key = config[‘api_key’]

install()

Set the API endpoint for historical trades

Section titled “Set the API endpoint for historical trades”

endpoint = ‘https://api.binance.com/api/v3/historicalTrades

params = { ‘symbol’: ‘BTCUSDT’, # Symbol of the trading pair ‘limit’: 1000, # Number of trades to retrieve (max 1000) ‘fromId’: 999999999 # Trade ID to fetch from (optional) }

headers = {‘X-MBX-APIKEY’: api_key}

response = requests.get(endpoint, params=params, headers=headers) response_json = response.json()

intervals = defaultdict(list) for trade in response_json: timestamp = trade[‘time’] # Convert timestamp to second second = timestamp // 1000 intervals[second].append(trade)

for second, trades in intervals.items(): # Convert Unix timestamp to human-readable date and time date_time = datetime.utcfromtimestamp(second).strftime(‘%Y-%m-%d %H:%M:%S’) print(f”[bold cyan]{date_time}[/bold cyan] - {len(trades)} trades”)

# Calculate aggregated data for this interval
total_volume = sum(float(trade['qty']) for trade in trades)
avg_price = sum(float(trade['price']) for trade in trades) / len(trades)
print(f"Total volume: {total_volume} - Average price: {avg_price}")

Additional processing can be done here to analyze or visualize the data

Section titled “Additional processing can be done here to analyze or visualize the data”

The code snippet you’ve provided fetches historical trade data from the Binance API, groups the trades by 1-second intervals, and calculates the total volume and average price for each interval. Now, if you want to plot the average price over time, you can do so using libraries like matplotlib or seaborn.

First, you’ll need to collect the data you want to plot. In this case, you’ll be plotting the average price against the timestamp. Here’s an example of how you can modify the code to create a plot:

  1. Import matplotlib: import matplotlib.pyplot as plt.
  2. Create two lists to store the timestamps and average prices.
  3. Add the values to the lists as you iterate through the trades.
  4. Use matplotlib to plot the data. Here’s the modified code snippet:
import matplotlib.pyplot as plt
# ...
# Process the intervals as needed
timestamps = []
avg_prices = []
for second, trades in intervals.items():
# Convert Unix timestamp to human-readable date and time
date_time = datetime.utcfromtimestamp(second).strftime('%Y-%m-%d %H:%M:%S')
print(f"[bold cyan]{date_time}[/bold cyan] - {len(trades)} trades")
# Calculate aggregated data for this interval
total_volume = sum(float(trade['qty']) for trade in trades)
avg_price = sum(float(trade['price']) for trade in trades) / len(trades)
print(f"Total volume: {total_volume} - Average price: {avg_price}")
timestamps.append(date_time)
avg_prices.append(avg_price)
# Plotting the average price
plt.plot(timestamps, avg_prices)
plt.xlabel('Time')
plt.ylabel('Average Price')
plt.title('Average Price Over Time')
plt.xticks(rotation=45)
plt.show()

This code will display a line plot showing the average price over time. You may want to further customize the plot (e.g., changing the tick frequency on the x-axis) to suit your specific needs.

can this function be added to the scrip to find the max drawdown?

Section titled “can this function be added to the scrip to find the max drawdown?”

can this function be added to the scrip to find the max drawdown?

Yes, you can add a function to calculate the maximum drawdown within your code. The maximum drawdown is a measure of the largest decline from a peak to a trough in a given dataset, often used to analyze the risk or volatility in financial investments.

You can calculate the maximum drawdown by tracking the maximum value up to the current point and the subsequent relative drops from this maximum. Here’s how you can add a function to calculate the maximum drawdown in your script:

  1. Create a function to calculate the maximum drawdown.
  2. Apply this function to the average prices. Below is the code to calculate the maximum drawdown:
def max_drawdown(prices):
max_value = prices[0]
max_drawdown_value = 0
for price in prices[1:]:
if price > max_value:
max_value = price
else:
drawdown = (max_value - price) / max_value
if drawdown > max_drawdown_value:
max_drawdown_value = drawdown
return max_drawdown_value * 100 # Convert to percentage
# ...
# Process the intervals as needed
avg_prices = []
for second, trades in intervals.items():
# ...
avg_price = sum(float(trade['price']) for trade in trades) / len(trades)
avg_prices.append(avg_price)
# ...
# Calculate and print the maximum drawdown
drawdown = max_drawdown(avg_prices)
print(f"Maximum Drawdown: {drawdown:.2f}%")

This code will print the maximum drawdown of the average prices as a percentage. It’s important to note that the drawdown is a relative measure, so the order of the data matters. Make sure that the avg_prices list is in chronological order, so that the drawdown calculation reflects the historical price movement.

Allow user to specify a start date and time and automatically retrieve the next …

Section titled “Allow user to specify a start date and time and automatically retrieve the next …”

Allow user to specify a start date and time and automatically retrieve the next 30 minutes of trade data to construct them in second based data:from rich import printfrom rich.traceback import installimport requestsfrom collections import defaultdictfrom datetime import datetimeimport jsonimport matplotlib.pyplot as pltimport numpy as npdef calculate_max_drawdown(nav_values): running_max = np.maximum.accumulate(nav_values) drawdowns = 1 - nav_values / running_max return np.max(drawdowns)def load_config(filename=“config.json”): ''' config.json file contains the following: { “api_key”: “key_string_here” } ''' with open(filename, ‘r’) as file: config = json.load(file) return configconfig = load_config()api_key = config[‘api_key’]install()# Set the API endpoint for historical tradesendpoint = ‘https://api.binance.com/api/v3/historicalTrades’# Define the parameters for the requestparams = { ‘symbol’: ‘BTCUSDT’, # Symbol of the trading pair ‘limit’: 1000, # Number of trades to retrieve (max 1000) ‘fromId’: 999999999 # Trade ID to fetch from (optional)}# Add the API key to the request headersheaders = {‘X-MBX-APIKEY’: api_key}# Send the GET requestresponse = requests.get(endpoint, params=params, headers=headers)response_json = response.json()# Group trades by 1-second intervalsintervals = defaultdict(list)for trade in response_json: timestamp = trade[‘time’] # Convert timestamp to second second = timestamp // 1000 intervals[second].append(trade)# Process the intervals as neededfor second, trades in intervals.items(): # Convert Unix timestamp to human-readable date and time date_time = datetime.utcfromtimestamp(second).strftime(‘%Y-%m-%d %H:%M:%S’) print(f”[bold cyan]{date_time}[/bold cyan] - {len(trades)} trades”) # Calculate aggregated data for this interval total_volume = sum(float(trade[‘qty’]) for trade in trades) avg_price = sum(float(trade[‘price’]) for trade in trades) / len(trades) print(f”Total volume: {total_volume} - Average price: {avg_price}”)# Additional processing can be done here to analyze or visualize the data# Process the intervals as neededtimestamps = []avg_prices = []for second, trades in intervals.items(): # Convert Unix timestamp to human-readable date and time date_time = datetime.utcfromtimestamp(second).strftime(‘%Y-%m-%d %H:%M:%S’) print(f”[bold cyan]{date_time}[/bold cyan] - {len(trades)} trades”) # Calculate aggregated data for this interval total_volume = sum(float(trade[‘qty’]) for trade in trades) avg_price = sum(float(trade[‘price’]) for trade in trades) / len(trades) print(f”Total volume: {total_volume} - Average price: {avg_price}”) timestamps.append(date_time) avg_prices.append(avg_price)# Calculate the max drawdownmax_drawdown = calculate_max_drawdown(np.array(avg_prices))print(f”Max Drawdown: {max_drawdown * 100:.4f}%“)print(f”Max Drawdown (fractional): {max_drawdown:.6f}%”)# Plotting the average priceplt.plot(timestamps, avg_prices)plt.xlabel(‘Time’)plt.ylabel(‘Average Price’)plt.title(‘Average Price Over Time’)plt.xticks(rotation=45)plt.show()

The code snippet you provided includes a function to calculate the maximum drawdown and plots the average prices. However, it doesn’t include functionality to allow the user to specify a start date and time, and then retrieve the next 30 minutes of trade data.

You can add this functionality by taking the following steps:

  1. Prompt the user to enter a start date and time.
  2. Convert the input into a UNIX timestamp.
  3. Modify the parameters in the API request to include the start timestamp and limit the data to a 30-minute window. Here’s the updated code snippet that includes these modifications:
from datetime import timedelta
# Prompt the user for the start date and time
start_date_time_str = input("Enter the start date and time (YYYY-MM-DD HH:MM:SS): ")
start_date_time = datetime.strptime(start_date_time_str, '%Y-%m-%d %H:%M:%S')
end_date_time = start_date_time + timedelta(minutes=30)
# Convert to UNIX timestamp (milliseconds)
start_timestamp = int(start_date_time.timestamp() * 1000)
end_timestamp = int(end_date_time.timestamp() * 1000)
# ...
# Group trades by 1-second intervals, considering only the specified 30-minute window
intervals = defaultdict(list)
for trade in response_json:
timestamp = trade['time']
if timestamp < start_timestamp or timestamp > end_timestamp:
continue
# Convert timestamp to second
second = timestamp // 1000
intervals[second].append(trade)
# ...
# Rest of the code remains the same

This code snippet now prompts the user to enter a start date and time and will retrieve the next 30 minutes of trade data from that specified time. Make sure that the input format matches the expected date and time format (YYYY-MM-DD HH:MM:SS).

Please note that the code currently assumes that the API request will return all the trades for the specified 30-minute window. Depending on the API’s behavior, you may need to implement additional logic to handle pagination or request multiple batches of data if the API does not return all the required trades in a single request.

design step by step a intelegent search logic that can find the starting point o…

Section titled “design step by step a intelegent search logic that can find the starting point o…”

design step by step a intelegent search logic that can find the starting point of the fromId that fit the starting point:from rich import printfrom rich.traceback import installimport requestsfrom collections import defaultdictfrom datetime import datetimeimport jsonimport matplotlib.pyplot as pltimport numpy as npdef calculate_max_drawdown(nav_values): running_max = np.maximum.accumulate(nav_values) drawdowns = 1 - nav_values / running_max return np.max(drawdowns)def load_config(filename=“config.json”): ''' config.json file contains the following: { “api_key”: “key_string_here” } ''' with open(filename, ‘r’) as file: config = json.load(file) return configconfig = load_config()api_key = config[‘api_key’]install()# Set the API endpoint for historical tradesendpoint = ‘https://api.binance.com/api/v3/historicalTrades’# Define the parameters for the requestparams = { ‘symbol’: ‘BTCUSDT’, # Symbol of the trading pair ‘limit’: 1000, # Number of trades to retrieve (max 1000) ‘fromId’: 999999999 # Trade ID to fetch from (optional)}# Add the API key to the request headersheaders = {‘X-MBX-APIKEY’: api_key}# Send the GET requestresponse = requests.get(endpoint, params=params, headers=headers)response_json = response.json()# Group trades by 1-second intervalsintervals = defaultdict(list)for trade in response_json: timestamp = trade[‘time’] # Convert timestamp to second second = timestamp // 1000 intervals[second].append(trade)# Process the intervals as neededfor second, trades in intervals.items(): # Convert Unix timestamp to human-readable date and time date_time = datetime.utcfromtimestamp(second).strftime(‘%Y-%m-%d %H:%M:%S’) print(f”[bold cyan]{date_time}[/bold cyan] - {len(trades)} trades”) # Calculate aggregated data for this interval total_volume = sum(float(trade[‘qty’]) for trade in trades) avg_price = sum(float(trade[‘price’]) for trade in trades) / len(trades) print(f”Total volume: {total_volume} - Average price: {avg_price}”)# Additional processing can be done here to analyze or visualize the data# Process the intervals as neededtimestamps = []avg_prices = []for second, trades in intervals.items(): # Convert Unix timestamp to human-readable date and time date_time = datetime.utcfromtimestamp(second).strftime(‘%Y-%m-%d %H:%M:%S’) print(f”[bold cyan]{date_time}[/bold cyan] - {len(trades)} trades”) # Calculate aggregated data for this interval total_volume = sum(float(trade[‘qty’]) for trade in trades) avg_price = sum(float(trade[‘price’]) for trade in trades) / len(trades) print(f”Total volume: {total_volume} - Average price: {avg_price}”) timestamps.append(date_time) avg_prices.append(avg_price)# Calculate the max drawdownmax_drawdown = calculate_max_drawdown(np.array(avg_prices))print(f”Max Drawdown: {max_drawdown * 100:.4f}%“)print(f”Max Drawdown (fractional): {max_drawdown:.6f}%”)# Plotting the average priceplt.plot(timestamps, avg_prices)plt.xlabel(‘Time’)plt.ylabel(‘Average Price’)plt.title(‘Average Price Over Time’)plt.xticks(rotation=45)plt.show()

Certainly! The code snippet you provided fetches historical trade data and performs some analysis on it, but it doesn’t include logic to find the appropriate fromId that corresponds to a specific starting point in time. You can add a search algorithm to find the correct fromId for a given start date and time. Here’s a step-by-step guide to implement this logic:

Prompt the user to enter a start date and time, and convert it into a UNIX timestamp (milliseconds).

start_date_time_str = input("Enter the start date and time (YYYY-MM-DD HH:MM:SS): ")
start_date_time = datetime.strptime(start_date_time_str, '%Y-%m-%d %H:%M:%S')
start_timestamp = int(start_date_time.timestamp() * 1000)

You can use a binary search-like algorithm to efficiently find the correct fromId. The idea is to make requests with different fromId values, comparing the timestamp of the first trade in the response to the target timestamp, and adjusting the search range accordingly.

Here’s a function to implement this logic:

def find_from_id(start_timestamp, symbol='BTCUSDT', limit=1000):
lower_bound = 0
upper_bound = 999999999
while lower_bound <= upper_bound:
mid = (lower_bound + upper_bound) // 2
params['fromId'] = mid
response = requests.get(endpoint, params=params, headers=headers)
response_json = response.json()
if response_json and 'time' in response_json[0]:
first_timestamp = response_json[0]['time']
if first_timestamp < start_timestamp:
lower_bound = mid + 1
elif first_timestamp > start_timestamp:
upper_bound = mid - 1
else:
return mid
else:
print("An error occurred while fetching trade data.")
return None
return None

Call the search function to find the correct fromId for the given start timestamp:

from_id = find_from_id(start_timestamp)
if from_id is not None:
print(f"Found fromId: {from_id}")
else:
print("Could not find fromId for the specified timestamp.")

Update the request parameters to use the found fromId:

params['fromId'] = from_id

Continue with the rest of the code to fetch and analyze the data.

This approach intelligently searches for the correct starting point and can be much more efficient than linearly scanning through the trades. It assumes that the trades are sorted by fromId, and that the fromId range is known (in this case, 0 to 999999999). If the exact fromId is not found, the function returns None, and additional logic could be added to handle this case, such as using the closest fromId.

➜ CustomRepresentation git:(main) ✗ /opt/homebrew/bin/python3.10 /Users/terryli…

Section titled “➜ CustomRepresentation git:(main) ✗ /opt/homebrew/bin/python3.10 /Users/terryli…”

➜ CustomRepresentation git:(main) ✗ /opt/homebrew/bin/python3.10 /Users/terryli/CustomRepresentation/CustomRepresentation/v3_api_binance_trade_data.py Enter the start date and time (YYYY-MM-DD HH:MM:SS): 2023-04-04 00:00:00 Could not find fromId for the specified timestamp. 2023-08-03 05:25:09 - 2 trades Total volume: 0.077 - Average price: 29108.28 2023-08-03 05:25:10 - 2 trades Total volume: 0.12230999999999999 - Average price: 29108.29 2023-08-03 05:25:11 - 5 trades Total volume: 0.007509999999999999 - Average price: 29108.288

It looks like the find_from_id function did not find a matching fromId for the specified timestamp, and it’s returning the trades from the current time instead. This might be due to a couple of reasons:

  1. Data Availability: The Binance API might not have historical trade data going back to the specified date (2023-04-04), particularly if you are accessing the data from a standard account without access to extended historical data.
  2. Search Logic: If the data does exist, there might be an issue with the binary search logic. It assumes that fromId is directly correlated with time, but this might not be the case. To address these possibilities, you might consider:
  • Checking Data Availability: Review the Binance API documentation or contact their support to understand how far back the historical trade data goes for your specific account type.
  • Refining the Search Logic: If the data does exist, consider implementing a different search algorithm that relies on a different endpoint or method to find the starting point.
  • Iterative Approach: If the binary search is not suitable for this data, an iterative approach that fetches batches of trades and checks the timestamps might be necessary. This might be less efficient but more reliable.
  • Using a Different Endpoint: Binance may offer other endpoints better suited for accessing historical data by date rather than fromId. Check their API documentation for any such options. Here’s an example of an iterative approach that you can try:
def find_from_id(start_timestamp, symbol='BTCUSDT', limit=1000):
from_id = 0
while True:
params['fromId'] = from_id
response = requests.get(endpoint, params=params, headers=headers)
response_json = response.json()
if response_json and 'time' in response_json[0]:
first_timestamp = response_json[0]['time']
last_timestamp = response_json[-1]['time']
if first_timestamp <= start_timestamp <= last_timestamp:
return from_id
elif first_timestamp > start_timestamp:
return None
from_id = response_json[-1]['id'] + 1
else:
print("An error occurred while fetching trade data.")
return None

This function retrieves batches of trades and checks whether the start timestamp falls within the range of the batch. It continues iterating until it finds the right batch or passes the start timestamp.

Please consult the Binance API documentation and consider the available options to ensure compliance with their guidelines and limitations.

Find the cloest but the earlier be good enough but no need of the exact.

Section titled “Find the cloest but the earlier be good enough but no need of the exact.”

Try to …

Find the cloest but the earlier be good enough but no need of the exact. Try to propose intellegent algo.

Certainly! An intelligent algorithm to find the closest but earlier fromId to a specified timestamp can combine elements of binary search with iterative refinement. Here’s a step-by-step approach:

Start with a binary search to quickly narrow down the range where the target timestamp lies. This gives us an approximate location.

Once the binary search has narrowed down the range, switch to an iterative approach to find the closest but earlier fromId.

Return the found fromId that corresponds to the closest but earlier timestamp to the target.

Here’s the code to implement this algorithm:

def find_from_id(start_timestamp, symbol='BTCUSDT', limit=1000):
lower_bound = 0
upper_bound = 999999999
# Step 1: Binary Search
while lower_bound < upper_bound:
mid = (lower_bound + upper_bound) // 2
params['fromId'] = mid
response = requests.get(endpoint, params=params, headers=headers)
response_json = response.json()
if response_json and 'time' in response_json[0]:
first_timestamp = response_json[0]['time']
if first_timestamp < start_timestamp:
lower_bound = mid + 1
else:
upper_bound = mid - 1
else:
print("An error occurred while fetching trade data.")
return None
# Step 2: Iterative Refinement
from_id = lower_bound
while True:
params['fromId'] = from_id
response = requests.get(endpoint, params=params, headers=headers)
response_json = response.json()
if response_json and 'time' in response_json[0]:
first_timestamp = response_json[0]['time']
if first_timestamp >= start_timestamp:
return max(0, from_id - 1) # Return the previous fromId
from_id += limit # Increment by the batch size
else:
print("An error occurred while fetching trade data.")
return None
# Usage:
from_id = find_from_id(start_timestamp)
if from_id is not None:
print(f"Found fromId: {from_id}")
else:
print("Could not find fromId for the specified timestamp.")

The binary search step narrows down the range quickly, and the iterative refinement step ensures that we find the closest but earlier fromId. It’s an intelligent approach that balances speed with accuracy.

Note: The above code assumes that trades are sorted by fromId, and that the fromId range is known (in this case, 0 to 999999999). The actual range may vary based on the specific symbol or exchange, so you might need to adjust these values accordingly.

how to make the default input as 2023-04-04 00:00:00

Section titled “how to make the default input as 2023-04-04 00:00:00”

start_date_time_str = inpu…

how to make the default input as 2023-04-04 00:00:00

start_date_time_str = input(“Enter the start date and time (YYYY-MM-DD HH:MM:SS): ”)

To provide a default value for an input in Python, you can use the input function in combination with a conditional statement. You can check if the user input is empty and, if so, use the default value.

Here is how you can modify the line of code to add a default input value:

start_date_time_str = input("Enter the start date and time (YYYY-MM-DD HH:MM:SS, default is 2023-04-04 00:00:00): ")
if start_date_time_str == '':
start_date_time_str = '2023-04-04 00:00:00'

In this case, if the user presses Enter without typing anything, the start_date_time_str will be set to ‘2023-04-04 00:00:00’.

more elegant one-liner?

Certainly, you can use the or operator to provide a default value for an input in Python in a one-liner:

start_date_time_str = input("Enter the start date and time (YYYY-MM-DD HH:MM:SS, default is 2023-04-04 00:00:00): ") or '2023-04-04 00:00:00'