Making things, writing code, wasting time...

Tag: Making

Flask asynchronous background tasks with Celery and Redis

Introduction:

This blog post will look at a practical example of how to implement asynchronous background tasks in a Flask environment, with an example taken from my ongoing project of building a Destiny the game inventory management web application.

During the design of DestinyVaultRaider.com one of the main pain points was manually updating my production environment every time the Destiny Manifest was changed. The development crew in Bungie were very active and were updating the Manifest right up to the launch of Destiny 2.

Before adding the background tasks, I had a small Python script running from my Raspberry Pi, which would send a request to Bungie, every 10 minutes, for the current Manifest version – if the stored Manifest version was different, the script would send me a message on a private Slack channel to notify me that the Manifest had changed and I’d need to update my Heroku environment.

If the Manifest version stored on my production environment didn’t match the current revision of the Manifest Bungie were using, this would cause an error in my Flask application, sending the user a HTTP 500 internal error response and be diverting them to a generic error page. This leads to a negative user experience, and with the Manifest being updated randomly – sometimes twice a week, I was left scrambling to update it as quickly as possible.

I store the Manifest in a Redis database, so  using Redis as a Message Broker (see below) made sense for my application.

Introduction to Celery:

From the Celery docs: “Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well”.

From the perspective of my app, I will be using Celery to create a scheduled task, which checks the Destiny Manifest every few minutes, and updates my Redis database as needed. I will also be creating an asynchronous task that will check the Manifest every time an authorised user sends a request to a specific endpoint on my server, such as https://www.destinyvaultraider.com/updateManifest.

For another look at implementing Celery with Flask, have a read of Miguel Grinberg’s blog on Celery with Flask.

How Celery works:

The asynchronous tasks will be set up as follows.

  1. Celery client:
    • This will be connect your Flask application to the Celery task. The client will issue the commands for the task.
  2. Celery worker:
    • A process that runs a background task, I will have 2 workers, a scheduled task and an asynchronous task called every time I visit a particular endpoint (/updateManifest).
  3. Message broker:
    • The Celery client communicates to the Celery worker through a message broker, I will be using Redis but you can also use RabbitMQ or RQ.

Installing Celery and Redis:

An important note: One of the main reasons I went with Celery and Redis; is because I do all of my development work on a Windows laptop – most of the libraries and tutorials I found were geared on developing in a Linux environment (which makes sense as your web application is most likely deployed on a Linux system). Celery has dropped Windows support but you can still install an earlier version to allow development on a Windows system.

Shout out to /u/WTRipper on Reddit for this information.

Installing Celery:

Celery can be installed from pip, version 3.1.25 supports Windows and worked well for me:

pip uninstall celery 
pip install celery==3.1.25

Installing Redis:

Redis is not officially supported on windows – but the Microsoft open tech group maintain a Windows port, which you can download here. (I downloaded the installer Redis-x64-3.0.504.msi).

The Flask application factory:

The Flask application factory concept is a methodology of structuring your app as a series of Blueprints, which can run individually, or together (even with different configurations). More than just this, it sets out a more standardised approach to designing an application.

This can also add a bit of complexity to designing an application, as most Celery tutorials focus on standalone applications and ignore the detail of integrating Celery into a large Flask application.

In my case, I will have a Blueprint for my API and a Main Blueprint for everything else.

Destiny Vault Raider app structure:

As Destiny Vault Raider uses the Flask application factory structure, each Blueprint is contained in it’s own folder. For DVR, I only need a “main” and “api” Blueprint, as I don’t require separate views for unauthenticated visitors (although it’s probably something I’ll add in future).

The main items to look out for are highlighted in red.

DestinyVaultRaider
│   celery_worker.py
│   config.py
│   manage.py
│
├───app
│   │   email.py
│   │   getManifest.py
│   │   models.py
│   │   __init__.py
│   │  
│   ├───api_1_0
│   │       views.py
│   │       __init__.py
│   │      
│   ├───main
│   │       errors.py
│   │       forms.py
│   │       Inventory_Management.py
│   │       OAuth_functions.py
│   │       views.py
│   │       __init__.py
│   │      
│   ├───static
│   │   │   style.css
│   │          
│   └───templates
│       │   index.html
│

Destiny Vault Raider updating manifest flow chart:

Flask application with Redis and Celery.

Flask application with Redis and Celery.

From the diagram, we can see:

  • How the Flask application connects to the Redis message broker.
  • The Message broker talks to the Celery worker.
  • The Celery worker calls (either the asynchronous or periodic) Python function to update the Redis Manifest database.
  • The Flask application can access the Manifest database directly, when a user makes a request to view their items.

Now, lets tun these ideas into code!

Creating the Celery worker:

Create an instance of the Celery worker, add the Celery configuration. The Celery configuration will be defined a little later in config.py.

app/__init__.py:

from celery import Celery

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)

def create_app(config_name):
    app = Flask(__name__)
    :
    :
    celery.conf.update(app.config)
    :

Adding the Celery worker to the app instance:

Take the instance of the celery object we created and and add it to the app context (read about the app_context here).

celery_worker.py:

import os
from app import celery, create_app

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
app.app_context().push()

Configuring Celery and Redis:

During development, your Celery client and Redis broker will be running on your local machine, however during deployment – these connections will be to a remote server. As you’ll need 2 setups, you’ll need to create the Config setup for both development and deployment. On DVR, I set an environment variable “is_prod” to True, which allows me to test if I’m in the deployed environment.

All of this configuration will be added to the Celery object in app/__init__.py, when we create the celery object and pass in the config with the command: celery.conf.update(app.config).

config.py:

First, I create the setup for the Celery beat schedule, I set the schedule for 5 minutes, which is 300 seconds.

# Create Celery beat schedule:
celery_get_manifest_schedule = {
    'schedule-name': {
        'task': 'app.getManifest.periodic_run_get_manifest',
        'schedule': timedelta(seconds=300),
    },
}

Note: The task  is named app.getManifest.periodic_run_get_manifest, the task is located in the “app” folder, in the “getManifest” file, and the function is called periodic_run_get_manifest.

Next, I create the Config object, with the Celery and Redis settings, for both production and development.

class Config:
    CELERYBEAT_SCHEDULE = celery_get_manifest_schedule
    # Development setup:
    if not is_prod:
        CELERY_BROKER_URL = 'redis://localhost:6379/0'
        CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
        REDIS_HOST = 'localhost'
        REDIS_PASSWORD = ''
        REDIS_PORT = 6379
        REDIS_URL = 'redis://localhost:6379/0'

    # Production setup:
    else:
        # Celery:
        CELERY_BROKER_URL = os.environ.get('REDIS_URL')
        CELERY_RESULT_BACKEND = os.environ.get('REDIS_URL')
        # Redis:
        REDIS_URL = os.environ.get('REDIS_URL')

Note: Both the Celery Broker URL is the same as the Redis URL (I’m using Redis as my messge Broker) the environment variable “REDIS_URL” is used for this.

Connecting to the Celery and Redis server:

Now that we’ve created the setup for the Celery and Redis we need to instantiate the Redis object and create the connection to the Redis server.

I also ping the Redis server to check the connection.

getManifest.py:

from . import celery
from celery.task.base import periodic_task
from config import config, Config

# Set Redis connection:
redis_url = urlparse.urlparse(Config.REDIS_URL)
r = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=1, password=redis_url.password)

# Test the Redis connection:
try: 
    r.ping()
    print "Redis is connected!"
except redis.ConnectionError:
    print "Redis connection error!"

Note: I tried to manually add the hostname, port and password as strings and populate the redis.StrictRedis command, however, this wouldn’t work for me and I could only connect to the Redis server if I used urlparse to format the URL for me (I presume it’s looking for a URL object and not a String object but couldn’t I figure out why).

The db=1 sets the database table number to 1, it defaults to 0 if not added.

REDIS_URL:

redis://h:p0dc...449@ec2-...-123.compute-1.amazonaws.com:48079

which is on the format of:

redis://<username>:<PASSWORD>@<HOST>:<PORT>

You can set this as an environment variable on Heroku by using the following command:

heroku config:set REDIS_URL redis://h:p0dc...449@ec2-...-123.compute-1.amazonaws.com:48079

Creating the asynchronous task:

Here is the definition of the run_get_manifest() function, it’s pretty huge so I won’t include all of the code.

However, the important thing to note is the @celery.task decorator.

@celery.task(name='tasks.async_run_get_manifest')
def run_get_manifest():
    """ Run the entire get_manifest flow as a single function """
    build_path()
    manifest_version = request_manifest_version()
    if check_manifest(manifest_version) is True:
        getManifest()
        buildTable()
        manifest_type = "full"
        all_data = buildDict(DB_HASH)
        writeManifest(all_data, manifest_type)
        cleanUp()
    else:
        print "No change detected!"

    return

To create the asynchronous function, I create a new  function async_run_get_manifest().

Inside this function, I call the original run_get_manifest function but add the delay() method, we can access the delay() method as we have wrapped the run_get_manifest() function in the @celery_task decorator.

def async_run_get_manifest():
    """ Asynchronous task, called from API endpoint. """
    run_get_manifest.delay()
    return

Creating the periodic task:

To create the periodic function, I create a new  function periodic_run_get_manifest().

This function is decorated with the @periodic_task decorator. The “run_every” parameter is required and sets the time interval.

@periodic_task(run_every=timedelta(seconds=300))
def periodic_run_get_manifest():
    """ Perodic task, run by Celery Beat process """
    run_get_manifest()
    return

So now I have 2 functions, that do the same thing, but with some important differences:

  1. periodic_run_get_manifest(): This is the periodic task that is run every 5 minutes.
  2. async_run_get_manifest(): This is the asynchronous task that will run in the background when a request is sent to the /updateManifest endpoint.

Starting the Celery workers:

To start the Celery workers, you need both a Celery worker and a Beat instance running in parallel. Here are the commands for running them:

worker -A celery_worker.celery --loglevel=info
celery beat -A celery_worker.celery --loglevel=info

Now that they are running, we can execute the tasks.

Calling the asynchronous task:

The asynchronous task will be called anytime an authorised account visits a designated endpoint, I’m using the endpoint “/updateManifest”. This will call the asynchronous task “async_run_get_manifest()” which will be executed in the background.

api_1_0/views.py:

You’ll need to implement a feature to detect if the user is authorised to access this endpoint, I’ve left that out for clarity’s sake.

In this case I return back to the index.html page, depending on how your API is setup, you may return a text or JSON response. I had a system in place where I would receive update messages on a private Slack channel – depending on how the update went.

@api.route('/updateManifest')
@login_required
def updateManifest():
    async_run_get_manifest()
    return render_template('index.html',
                            site_details    = site_details,
                            ) 

Executing the periodic task:

The Periodic task will be executed every 5 minutes when the Celery Beat scheduler is running. Here I can check the progress from the Celery output:

[2017-11-22 13:38:08,000: INFO/MainProcess] Received task: app.getManifest.periodic_run_get_manifest[97a82703-af22-4a43-b189-8dc192f55b84]
[2017-11-22 13:38:08,059: INFO/Worker-1] Starting new HTTPS connection (1): www.bungie.net
[2017-11-22 13:38:10,039: WARNING/Worker-1] Detected a change in version number: 60480.17.10.23.1314-2
[2017-11-22 13:38:10,042: INFO/Worker-1] Starting new HTTPS connection (1): slack.com
[2017-11-22 13:38:10,803: WARNING/Worker-1] Detected a change in mobileWorldContentPaths: [u'en']

We can see from here:

  • Recieved the task: app.getManifest.periodic_run_get_manifest()
  • Created a new HTTPS connection to www.bungie.net – this is the request to check the Manifest version.
  • Next I check the version number of the Manifest, and print the line “Detected a change in version number: 60480.17.10.23.1314-2”
  • Created a new HTTPS connection to www.slack.com and I send this line to Slack as a message.
  • Next I check the mobileWorldContentPaths version for the English Manifest, and print the line “Detected a change in mobileWorldContentPaths: [u’en’]”

In my case I didn’t need my app to keep track of the task status or check if it’s completed correctly, but Celery has that option. I get updates from the Slack messages.

Creating a development Start up script:

Here’s the script I use to start up the development server, Redis server, Celery worker and Celery Beat worker. Save the following into a file called “Startup.bat” and you can just double click on the file icon to start each process, in it’s own window, in your development environment.

This can save a lot of time as opening 4 command windows and starting each process separately.

C:\
cd /d "C:\Users\AllynH\Documents\Python\Flask\DestinyVaultRaider_Redis_Celery_API"
start /K redis-cli shutdown
start timeout 5
start CMD /K redis-server
start CMD /K celery worker -A celery_worker.celery --loglevel=info
start CMD /K celery beat -A celery_worker.celery --loglevel=info
start CMD /K python manage.py runserver

Here’s a breakdown of what the script is doing:

  • The first line changes to our working directory.
  • Next I shutdown any existing Redis server (sometimes Redis wouldn’t start correctly unless I had done a shutdown first).
  • Then I wait for 5 seconds to allow the Redis server to shutdown.
  • The next 4 commands are used to start the Redis server, Celery worker, Celery Beat worker, and Flask server – each started in their own command shell.
Redis server, Celery workers and Flask server started.

Redis server, Celery workers and Flask server started via the Startup.bat script.

Running on Heroku:

Here are some Heroku specific changes, you can skip these if you’re not running on Heroku.

Creating a Redis broker and adding it to the app:

You’ll need to create a Redis broker and attach it to your app, this will give you the REDIS_URL mentioned above.

heroku addons:create heroku-redis -a destinyvaultraider

Editing the procfile:

To start the Celery worker and Beat processes, add the following to your procfile:

worker: celery worker -A celery_worker.celery --beat --loglevel=info

Note: we can kick off both the Celery worker and Beat scheduler in one command here, whereas we couldn’t on Windows.

Scaling the Worker dyno:

To start the process, you need to enable the Celery worker Dyno:

heroku ps:scale worker=1

Now your background tasks should be up and running!

Note on running Celery and Redis on Heroku:

The pricing on Heroku is really expensive to the point of being prohibitive, from my perspective I have had to disable the Redis database and Celery worker as Heroku require you to pay separately for each of these.

For example the pricing for the package I wanted worked out like this (as of November 2017), all figures are per month:

  • Hobby Dyno: $7 (Required for HTTPS certification).
  • Celery worker: $7.
  • Redis database 100MB: $30 (80MB required).

This is obviously not feasible for a hobby project that isn’t making any money.

In comparison, a Digital Ocean, Vultr or OVH also provide Virtual Private Server services from ~$5 per month, which would allow you to run Redis and Celery inclusive of that price.

So before you invest your time in Heroku, research some of the alternatives 🙂

Creating a Python app for Destiny – Part 8: Displaying the Vault contents.

Introduction:

Big news: I’ve managed to launch my own Destiny based website!!! I’ve been working on an inventory management system for a while now, and following on from my previous blog posts I’ve managed to deploy my work to date as a live website.

You can test it out for yourself here https://www.destinyvaultraider.com once you’ve authorised your account, you can click on Vault, Character or Xur (if he’s around).

The aim of this website is to use it as a live learning tool, where I can continue to develop and add new features. For example, I can display the vault and character inventory but I can’t transfer items yet, so this is a feature I hope to add in the future.

If you’d like to review the previous Destiny API posts, check them out here:

  1. Send a request to read Xurs inventory.
  2. Send a HTML formatted email with Xurs inventory.
  3. Log into Destiny and Bungie.net via your PSN account.
  4. Transfer items from your vault to your character and equip them.
  5. Reading a characters inventory and vault contents.
  6. Creating a Python web server with Flask.
  7. Authenticating our app with Bungie.net OAuth.

You can also find me on Twitter here @Allyn_H_

Flow chart:

This section is quite confusing as I’ll be making a request to Bungie, then decoding that request and changing the format of the response a few times before plugging it into my Flask app, so here’s a quick guide as to how I’m creating the Vault route. Excuse the colours, I’m a representing Future War Cult. You can click on the image to make it bigger.

Future War Cult representing!

Flow chart detailing the steps for creating the vault page.reate vault route:

Create vault route:

In the previous section we created an index route, which was used to display the index.html page. Now that we have authorised the user, we can build on this and add a new route to display the users Vault contents.

To create the Vault route, the code looks like this:

@main.route('/vault')
def vault():
    :
    # Do something...
    :
    return render_template('vault.html') 

Right now, ‘vault.html’ is an empty file that extends from our ‘base.html’ – it will only display our navbar. For a quick refresh on the Flask setup, check out this post: Creating a Python app for Destiny – Part 7: Authenticating our app with Bungie.net OAuth.

Right now the “vault.hmtl” template looks like this:

{% extends "base.html" %}
{% block content %}

{% endblock %}

We are extending from the “base.hmtl” template and our content block is empty. Now we can build on this and add some content.

in order to build our Vault page we will need to do a few things:

  1. Send a request to Bungie.net to get the users account details:
    1. We need the users membershipType, destinyMembershipId and characterId.
  2. Send a request to Bungie.net to get the users Vault contents.
  3. Parse through the users vault contents and take out the data we wish to display.
    • Again, we’ve done this above, we just need to tweak the code a bit.
  4. Categorise and display each item.
    • The JSON response from Bungie doesn’t return the items in any order, so we need to categorise them in order to display them properly.

In order to better categorise the vault items, I created a dictionary of each item category. These categories are stored in the item details returned from the manifest,  as item[‘bucket’]. I’ve split these out as below:

invItems = {
    1 : 'Primary Weapons', 
    2 : 'Special Weapons', 
    3 : 'Heavy Weapons', 
    4 : 'Ghost', 
    5 : 'Helmet', 
    6 : 'Gauntlets', 
    7 : 'Chest Armor', 
    8 : 'Leg Armor', 
    9 : 'Class Armor', 
    10 : 'Artifacts', 
    11 : 'Vehicle', 
    12 : 'Sparrow Horn', 
    13 : 'Ships',
    14 : 'Shaders',
    15 : 'Emblems',
    16 : 'Emotes',
    17 : 'Weapon Bundles',
    18 : 'Materials',
    19 : 'Consumables',
    20 : 'Ornaments',
    }

Each of these categories will be used to create a section in our vault page. We will loop through these categories and create the HTML needed to display each item. First though, we need to get the users vault details…

1: Send a request to Bungie.net to get the users account details:

Once the user has authenticated their account, we can request information about their account.

Send a request to Bungie.net to get the users account details

Once the user has been authenticated by Bungie, we can send a request to Bungie to get the account details of the current user. This will return  some important information like the users PSX / Xbox Live username, membershipType, destinyMembershipId, and all of the users character IDs.

Here’s the code to send the “GetCurrentBungieAccount” request:

def GetCurrentBungieAccount(session):
    req_string = 'https://www.bungie.net/Platform/User/GetCurrentBungieAccount/'
    res = session.get(req_string)
    error_state = res.json()['ErrorStatus'].decode('utf-8')
    return res

There’s nothing there we haven’t seen before. The function takes the authenticated session data as a parameter and returns the JSON response. As we can see, there’s a lot of important account data returned  that will enable us to view and interact with the users account:

Lots of important user data returned here.

The response from getcurrentbungieaccount returns a lot of important account data.

This Endpoint will also give character details, such as race, gender and class, details of the users clan affiliation, the users Grimore and more.

2: Send a request to Bungie.net to get the users Vault details:

Request and Response

Sending a request to Bungie.net to get the users Vault details

Now that we have the users membershipType and destinyMembershipId, we can send a request to Bungie for their vault contents.

def getVault(session, membershipType, destinyMembershipId):
    getVault_url = base_url + membershipType + "/MyAccount/Vault/"
    res = session.get(getVault_url, params={'accountId': destinyMembershipId})
    return res

Again we send a request, with the users membershipType and destinyMembershipId added to the URL as parameters, the the function then returns the JSON response from Bungie.

3: Parse the Vault response and return only the required data:

Now that we have a response from Bungie, we need to convert this into a human readable format. The JSON response from the getVault request (in my case) is 35,147 lines of text… That’s a lot of data, also there is no real human readable data in there, it doesn’t return the item names for example – just a itemHash value. So we now need to strip out each item hash and search the manifest for the human readable format of that data. Again, the response after decrypting these items from the Manifest is 54,150 lines of text, so we’ll need to extract only the lines of code we’re interested in. Each item will be condensed down to 10 pieces of useful information (we can build on this at a later stage if needed).

Parsing manifest and categorising the data.

Decoding the itemHash from the manifest and stripping out the required data.

One of the things we can also do, to make life a little easier for us, is to categorise each item by its “bucketName”  – this is a value stored in the manifest that is used to categorise each item, for example “Primary Weapon”, “Consumables” or “Ghost”. The response from getVault is split into 3 buckets, “Weapons,” “Armor” and “Inventory”. This can be seen when you visit your vault in the tower or on any app.  In our case, we’re going to add them all to 1 page, with the ‘bucketName’ (i.e. “Primary Weapon”, “Chest Armor” or “Consumables”) used to categorise each item.

First, lets create a list of blank dictionary objects with the items we want to display for our vault:

array_size = 0
weapon_list = [{
    'itemReferenceHash': 0,
    'itemId': 0,
    'itemName': '',
    'tierTypeName': '',
    'itemTypeName': '',
    'itemLightLevel ': '',
    'stackSize': 1,
    'icon': '',
    'bucket': '',
    'equipped': '',
} for array_size in range(vaultSize)]
array_size = 0

Now that the list is created, lets loop through each inventory item, decode the values from the manifest and store the data we want to display.

Nested JSON dicts and lists...

The JSON response from getVault shows the itemHash we need to decrypt.

We are also going to query each “itemHash”  from the manifest to get the full item details, you can read back  to part 5 for a refresh on what I’m doing here.

If we look at the JSON response from the getVault request, shown in the picture above, both “buckets” and “items” are lists of nested dictionary items , so we need to loop through “buckets” and then each of the “items”, to get to the items “itemHash”. (You can see they’re lists because of the square brackets “[“, denote the start of the list.)

The code for looping through the nested lists is below:

for bucket in vaultResult.json()['Response']['data']['buckets']:
    for item in bucket['items']:
        weapon_list[array_size]['itemReferenceHash'] = item['itemHash']
        inventoryItem = all_data['DestinyInventoryItemDefinition'][item['itemHash']]
        weapon_list[array_size]['itemName'] = inventoryItem['itemName']

Once we have the “itemHash” we can query the Manifest definition “DestinyInventoryItemDefinition” for the item details. We can then store the information we want to keep in our own list of dictionary items.

Stripping out the important data:

The function parseVault takes 3 parameters; the authorised “session”, the “vaultResult” JSON response and the Manifest data in the format of a Python dictionary as “all_data”. Here’s what the full code for stripping each item from the manifest looks like:

def parseVault(session, vaultResult, all_data):
    for bucket in vaultResult.json()['Response']['data']['buckets']:
        for item in bucket['items']:
            weapon_list[array_size]['itemReferenceHash'] = item['itemHash']
            inventoryItem = all_data['DestinyInventoryItemDefinition'][item['itemHash']]
            weapon_list[array_size]['itemName'] = inventoryItem['itemName']
            weapon_list[array_size]['itemLightLevel'] = item.get('primaryStat', {}).get('value', "")
            if ((inventoryItem['itemName'] != "Classified") and (inventoryItem['itemHash'] != 1826822442)):
                bucketHash = all_data['DestinyInventoryBucketDefinition'][inventoryItem['bucketTypeHash']]
                weapon_list[array_size]['itemName'] = inventoryItem['itemName']
                weapon_list[array_size]['tierTypeName'] = inventoryItem['tierTypeName']
                weapon_list[array_size]['itemTypeName'] = inventoryItem['itemTypeName']
                weapon_list[array_size]['icon'] = "https://www.bungie.net/" + inventoryItem['icon']
                weapon_list[array_size]['bucket'] = bucketHash['bucketName']
            # Classified items won't have this information, if not overwritten can cause fails:
            if ((inventoryItem['itemName'] == "Classified") or (inventoryItem['itemHash'] == 1826822442)):
                weapon_list[array_size]['itemName'] = inventoryItem['itemName']
                weapon_list[array_size]['tierTypeName'] = "Classified"
                weapon_list[array_size]['itemTypeName'] = "Classified"
                weapon_list[array_size]['bucket'] = "Classified"
            array_size += 1 
    return weapon_list

Also note, as I’ve hit a few issues recently with Bungie releasing new items – but forgetting to change them from “Classified” in the Manifest, I’ve added a default value for any items with an “itemName” of “Classified” – this will prevent any errors occurring if Bungie add a Classified item at a later stage.

Populating the vault route with our data:

Now that we have stripped the important data out of the manifest, into a list of dictionary’s,  we can pass this list to the ‘vault’ route in the ‘views.py’ file. As a side note, it would have been possible to populate the Vault route without stripping out this data – but it’s much harder to parse through lists of nested dictionary items in the Jinja2 template than in Python. Also, if we were creating a high traffic site, it’s not good practice to send large files of data every time the user refreshes a webpage.

In the case of our Vault route, this could get to about 5MB per page view… Users on mobile data wouldn’t be happy about that, so we need to cut this down.

Creating the Flask Vault route with data taken from the manifest.

Populating the Vault route in our Flask app with the data stripped from the manifest.

Now that we have our important data stripped out and stored in a list of dictionary objects, we need to pass this dictionary to the render_template function of the vault route as a parameter.  The “invItems” dictionary, containing the item categories is also passed as a parameter.

@main.route('/vault')
def vault():
    userSummary = GetCurrentBungieAccount(oauth_session)
    vault = getVault(oauth_session, user.membershipType, user.destinyMembershipId)
    weaponList = parseVault(oauth_session, vault, all_data)
    return render_template('vault.html',
        weaponList = weaponList, 
        invItems = invItems,
        character = userSummary.json()['Response']['destinyAccounts'][0]['userInfo']['displayName'], 
        characterHash = characterHash, 
        charId = userSummary.json()['Response']['destinyAccounts'][0]['characters'][0]['characterId'],
        lightLevel = userSummary.json()['Response']['destinyAccounts'][0]['characters'][0]['powerLevel'],
        emblemImage = userSummary.json()['Response']['destinyAccounts'][0]['characters'][0]['emblemPath'],
        backgroundImage = userSummary.json()['Response']['destinyAccounts'][0]['characters'][0]['backgroundPath'],
        ) 

The other parameters (character, characterHash, charId, lightLevel, emblemImage and backgroundImage) are all taken from the response from getcurrentbungieaccount() and were shown in the last blog post, Part 7: Authenticating our app with Bungie.net OAuth.

Displaying an item in HTML:

To display each item, I created a HTML template file called “itemBlock.html”

The code is quite basic, 2 <div>’s an <img> and 2 <p>’s. The first <div> sets out a Bootstrap column, col-md-3 will allow 4 images to be tiled, per row, before moving to a new row. For the image, I’m passing a lot of parameters but I’m not actually using them yet, as I haven’t found a tidy way to display all of the information. In the <p> paragraph sections, I’m displaying the itemName, itemLightLevel, tierTypeName and itemTypeName.

Here’s the code in the “itemBlock.html”:

<div class="col-md-3 col-sm-4 col-xs-6">
    <div class="thumbnail">
        <img class="img-responsive med-tile" src="{{ dict_item['icon'] }}" title="{{ dict_item['itemName'] }}" tierTypeName="{{ dict_item['tierTypeName'] }}" itemTypeName="{{ dict_item['itemTypeName'] }}" tier="{{ dict_item['tier'] }}" bucket="{{ dict_item['bucket'] }}" itemLightLevel="{{ dict_item['itemLightLevel'] }}" itemReferenceHash="{{ dict_item['itemReferenceHash'] }}">
        <p>{{ dict_item['itemName'] }}: {{ dict_item['itemLightLevel'] }}</p>
        <p>{{ dict_item['tierTypeName'] }} {{ dict_item['itemTypeName'] }}</p>
    </div>
</div>

Here’s what that will look like when displayed:

WOOT Fragment of the Prime!

Here’s what the output from the ‘itemBlock.html’ template file looks like.

Edit vault.html template to display all items:

To display the vault contents in the correct order, we need to step through the invItems list and pick out each item of each category.

Here’s the code in the “itemBlock.html” file:

<div class="inventory-container">
{% for item in invItems -%}
    {% for dict_item in weaponList -%}
 
        {% if invItems[item] in dict_item['bucket'] -%}
            {% include 'itemBlock.html' -%}
        {% endif -%}

    {% endfor -%} 
{% endfor -%}
 
</div>

In the code below, we test to see if the current item in our list is one of the category of items we want to display:

{% if invItems[item] in dict_item['bucket'] -%}
    {% include 'itemBlock.html' -%}
{% endif -%}

If so we include the “itemBlock.html” file to display the item.

Viewing our web site:

As usual, the full set of code can be found on my GitHub page:

https://github.com/AllynH/Destiny_Flask_Webserver

There are a few files / folders, so you’ll need to download it from there.

To run the code you can type:

python app.py

This will start the Flask Web Server, you’ll see some output like this:

* Restarting with stat
Opening Manifest...
Finished!
 * Debugger is active!
* Running on https://127.0.0.1:5000/ (Press CTRL+C to quit)

First, click on the “Authenticate with Bungie” link and follow the instructions to authenticate your account.

Index page view:

Here is the view of the index page.

You should see a screen like this, asking you to review and approve the permissions required by this app.

Authorise your app.

Don’t forget to review what permissions your giving the app.

You should then be redirected back to the index page.

Now that you’ve been authorised and logged into Bungie via our app, we can open the link “view your vault contents”, you’ll see something like this:

Vault view.

Here’s what the vault view currently looks like!

Next steps:

Right now, we’ve created the basic shell of the website / app. This is a good start and we can build onto this, add features, create new pages.

The next steps for me will be to add more functionality to the website:

  • Character inventory view.
  • Xur inventory view.
  • Finish the refresh token flow, so users don’t need to authenticate every 30 mins.
  • Add a user database to securely store the users refresh tokens.
  • Add a transfer item feature.

All of the above is finished (except the transfer item feature), I just need to write up the blog post 🙂

Check out my website:

https://www.destinyvaultraider.com/

Creating a Python app for Destiny – Part 7: Authenticating our app with Bungie.net OAuth

Introduction:

Bungie are moving away from their Cookie based authentication flow and have created a new OAuth 2.0 style flow, this allows for a safer, more standardised approach to authenticating users with Bungie.net. Read the Bungie.net authentication release article here.

Bungie outline some of the benefits of using the new authentication flow:

  • It uses OAuth 2.0 style sign in flow, and does not depend on fragile cookies.
  • It gives the user a chance to review the scope of permissions granted to an application so they can understand what the application can do on their behalf.
  • Users can review all write operations performed by an application, and disable apps they no longer wish to have access.
  • It’s sanctioned by Bungie, and we will smile upon apps that use this mechanism instead of cookies. Also, this is the only mechanism permitted by our terms of service for application developers to make use of APIs that require authentication.

If you’d like to review the previous Destiny API posts, check them out here:

  1. Send a request to read Xurs inventory.
  2. Send a HTML formatted email with Xurs inventory.
  3. Log into Destiny and Bungie.net via your PSN account.
  4. Transfer items from your vault to your character and equip them.
  5. Reading a characters inventory and vault contents.
  6. Creating a Python web server with Flask.

You can also find me on Twitter here @Allyn_H_

Also, many thanks to all the people involved in the Destiny Item Manager development team (too many to name individually) and Vlad from Destiny Trials Report,  for their help debugging some of the issues with the OAuth flow!

Creating an app on Bungie.net:

To create an app on Bungie, you need to visit the developer page here. If you’ve previously created one, you’ll need to update it for the new authentication flow.

Fill in the required details:

  1. Application name: Give your app a recognisable name.
  2. Application status: Set it to private unless you plan to deploy this app.
  3. Website: A place where people can find details of your app.
  4. Redirect URL: You’ll need to set a HTTPS redirect URL for your app in order to complete OAuth authorisation.
Setting Callback URL and permissions.

I’ve added all the permissions here, as I’m also using this to test other features – you may not need all these permissions.

Then click on the save changes button at the bottom.

Hooray, your app is created!

App is created, now let’s get coding!

Now, let’s get to coding up the flow…

Creating the authorisation URL:

When you registered your app on Bungie, you should have been given an authorisation URL with a unique number, like this: https://www.bungie.net/en/Application/Authorize/1234.

We are going to take that URL and add some parameters to it before we make our authorisation request. Here’s what the updated “index.html” route looks like:

AUTH_URL = 'https://www.bungie.net/en/Application/Authorize/1234?'

@app.route('/')
@app.route('/index')
def index():
    state = make_authorization_url()
    state_params = {'state': state}
    url = AUTH_URL + urllib.urlencode(state_params)
    return render_template('index.html', url=url)

Before we look at what the state parameter is doing, here’s how we pass the URL to our HTML “index.html” template, via the render_template method.

 Creating the index page:

I’m going to build on the templates made in the previous blog post, you can review them here. The index.html page is left deliberately bare, there’s a lot of cool stuff we could do here but let’s get our basic program up and running first. In our index.html template file, we can create a hyperlink to authorise our account like this:

{% block page_content %}

<div>
    <h2><a href="{{ url }} " >Authenticate with Bungie</a></h2></br></br>
</div>


{% endblock %}

The value for {{ url }} will be passed in from the render_template method, and the hyper link will be created.

Preventing Cross Site Request Forgery with the state parameter:

As an extra precaution, we are going to add some Cross Site Request Forgery (CSRF) protection, as an added security benefit to the visitors of our site, there is a great post about this, referenced on the Bungie authentication article, you can read the post here.

Here’s how we are going to handle the state parameter of our authorisation request and callback:

  1. Create a state value, a random string of numbers and letters.
  2. Save this state value in the users secure HTTPS session.
  3. Add this parameter to the authorisation URL when we direct the user to the Bungie authorisation URL.
  4. When the user is redirected to back to our site via the callback URL, Bungie will echo the state parameter back to us.
  5. Compare the state parameter from Bungie and the state parameter we stored in the users session.

If the state parameters are the same – we know the user is who they say they are, and nothing dodgy is happening.

Here’s what the state parameter looks like in action:

State parameter in action during app authorisation.

Here we can see the state parameter in the URL.

Here’s what it looks like when Bungie redirects the user to the callback URL:

State parameter in callback.

User is redirected to the callback URL, then redirected to the /index route.

Let’s code this up –  here’s what our index view looks like (deliberately basic):

@app.route('/')
@app.route('/index')
def index():
    state = make_authorization_url()
    state_params = {'state': state}
    url = AUTH_URL + urllib.urlencode(state_params)
    print url
    return render_template('index.html', url=url)

The first thing we do is call the function make_authorization_url() and store the result in the “state” variable.

Inside the make_authorization_url(), we create a unique UUID (Universally Unique Identifier)  32 bit string, using  the Python library “uuid4”.  We store this UUID string in a variable called “state”.

Now that we have generated the state string, we’ll need to store, so we then call a function, save_created_state(state) and pass it the “state” value we just generated. We also return the

Here’s what the code looks like:

 
def make_authorization_url():
    # Generate a random string for the state parameter
    from uuid import uuid4
    state = str(uuid4())
    save_created_state(state)
    return state 

Now that we have generated the “state” value, we need to store this in the session, here’s what the code for save_created_state() looks like:

# Save state parameter used in CSRF protection: 
def save_created_state(state):
    session['state_token'] = state
    pass

Now that we have generated the state parameter, we can generate our authorisation URL and send the user to Bungie to be authenticated.

Handling a callback:

Once we have sent our request for authorisation, our server will listen on the callback URL for a response. The response from Bungie will also echo our CSRF state parameter back to us, so we are sure the response is from Bungie.

The callback route should read the state parameter, test to see if it’s the same parameter we stored in the users session – if it’s not the same state parameter- we send a HTTP 403 “forbidden” response. If the state parameter is the same as the one we sent, we can then store the authorisation code (access_token).

@app.route('/callback/bungie')
def bungie_callback():
    state = session.get('state_token')
    if not is_valid_state(state):
        print "Uh-oh, this request wasn't started by us!"
        abort(403)
    session.pop('state_token', None)
    code = request.args.get('code')
    access_code = code
    token = get_token(code)
    return redirect(url_for('index'))

The method  is_valid_state(state) is used to check that the state echoed back from Bungie is the same as the state value we have stored in our session.

Below you can see we are passing in the returned state value and comparing it to the “saved_state” value taken from the session:

def is_valid_state(state):
    saved_state = session['state_token']
    if state == saved_state:
        print "States match, you are who you say you are!"
    return True
    else:
        return False

If the states match, we return “True” and send our request for the access_token, if the states do not match, we return “False” and throw our 403 error.

The code “session.pop(‘state_token’, None)” removes the state parameter from the session (set’s it to “None”) – we don’t need it anymore.

Getting the Access Token from the Authorisation Code:

Now that the user has authorised their account and we have received the authorisation code from Bungie.net, we can swap this authoirsation code for the access token. This relates to step 4 & 5 of the Bungie.net authorisation flow.

First, we take the authorisation code and add it as the value of a Python dictionary object “post_data” – this dictionary object will be added to the body of the post request and transmitted as a JSON object.

def get_token(code):
    HEADERS = {"X-API-Key":'MY-API'}
    post_data = {'code': code}
    response = requests.post(access_token_url, json=post_data, headers=HEADERS)
    :

Now that we have made a request for the token – lets look at the response:

We've got the access_token!

JSON response containing access_token and refresh_token.

The section we are most interested in (right now) is:

    "accessToken": {
        "readyin": 0, 
        "expires": 3600, 
        "value": "COoJEo ... j8w=="
 }

We now have the access token needed to make any authorised API request! The response contains a bit more information too; the access token is ready in 0 seconds – which means we can use it immediately. The access token will expire in 3600 seconds – 1 hour from now. The access token value, is of course, the really long string.

Now that we have the JSON response, we can save the access token and the refresh token. I’m also saving the time values which tell me when my refresh token is ready and when it expires.

I’m not going to worry about refreshing the token in this blog post, as it takes a bit more work. I will be updating my code and writing a new blog post at a later date.

def get_token(code):
    :
    access_token = response.json()['Response']['accessToken']['value']
    refresh_token = response.json()['Response']['refreshToken']['value']
    refresh_ready = datetime.now() + timedelta(seconds=int(response.json()['Response']['refreshToken']['readyin']))
    refresh_expired = datetime.now() + timedelta(seconds=int(response.json()['Response']['refreshToken']['expires']))
    save_session(token_json)
    userSummary = GetCurrentBungieUser(oauth_session)
    return userSummary.json()['Response']['displayName']

We will use this to create an authorised HTTP session.

Creating our authorised session:

Now that we have our API key and our access token – we can create an authorised session. As before the API-Key is added to the session header “X-API-Key”. We also need to add an “Authorization” header with the value of our access token, here’s how the access token above would look: “Bearer  COoJEo … j8w==” (I’ve shortened it a little here).

Here’s the code to create the session:

def save_session(token_json):
    oauth_session = requests.Session()
    oauth_session.headers["X-API-Key"] = API_KEY
    oauth_session.headers["Authorization"] = 'Bearer ' + str(token_json)
    access_token = "Bearer " + str(token_json)

Making an authorised request:

Now that we are authorised – we can try a simple GET request to the GetCurrentBungieAccount endpoint (as recommended in the Bungie Auth article), this will give us some information on the logged in character, from their Bungie account.

For this – I’ve created a new view template called “vault.html”, for now this is just a straight copy of the “index.html” template. In the future, I’ll populate this with all of the vault contents.

def vault():
    userSummary = GetCurrentBungieAccount(oauth_session)
    return render_template('vault.html', 
        character=userSummary.json()['Response']['user']['displayName'], 
        lightLevel = charSummary.json()['Response']['data']['characterBase']['stats']['STAT_LIGHT']['value'],
        emblemImage = account.json()['Response']['data']['characters'][0]['emblemPath'],
        backgroundImage = account.json()['Response']['data']['characters'][0]['backgroundPath'],
    )

Here’s what the code for GetCurrentBungieAccount looks like, as we’ve seen before, it’s the same Python Requests GET format, we are passing the “oauth_session”, that we saved a moment ago, into the function as a parameter. We are returning the JSON response to the vault() route.

def GetCurrentBungieAccount(session):
    req_string = 'https://www.bungie.net/Platform/User/GetBungieNetUser/'
    res = session.get(req_string)
    return res

I’ve also created a new function to return some character specific data using the getCharacterSummary endpoint, this will give me some data like my characters light level and my characters emblem.

def GetCharacterSummary(session):
    req_string = base_url + membershipType + "/Account/" + destinyMembershipId + "/Character/" + characterId + "/"
    res = session.get(req_string)
    return res

Viewing our web site:

As usual, the full set of code can be found on my GitHub page:

https://github.com/AllynH/Destiny_Flask_Webserver

There are a few files / folders, so you’ll need to download it from there.

To run the code you can type:

python app.py

This will start the Flask Web Server, you’ll see some output like this:

* Restarting with stat
Opening Manifest...
Finished!
 * Debugger is active!
* Running on https://127.0.0.1:5000/ (Press CTRL+C to quit)

First, click on the “Authenticate with Bungie” link and follow the instructions to authenticate your account.

Index page view:

Here is the view of the index page.

You should see a screen like this, asking you to review and approve the permissions required by this app.

Authorise your app.

Don’t forget to review what permissions your giving the app.

You should then be redirected back to the index page.

Now that you’ve been authorised and logged into Bungie via our app, we can open the link “view your vault contents”, you’ll see something like this:

Welcome to your vault - yet to be finished.

Gamertag, light level, Emblem and emblem background display.

Next steps:

Right now, we’ve created the basic shell of the website / app. This is a good start and we can build onto this, add features, create new pages.

The next steps for me will be to add more functionality to the website:

  • Character inventory view.
  • Xur inventory view.
  • Finish the refresh token flow, so users don’t need to authenticate every 30 mins.
  • Add a user database to securely store the users refresh tokens.
  • Add a transfer item feature.

All of the above is finished (except the transfer item feature), I just need to write up the blog post 🙂 I’m looking to deploy the code soon to a real live web site, so stay tuned.

 

Creating a Python app for Destiny – Part 5: Reading a characters inventory and vault contents.

Introduction:

In the previous sections I showed how to:

  1. Send a request to read Xurs inventory.
  2. Send a HTML formatted email with Xurs inventory.
  3. Log into Destiny and Bungie.net via your PSN account.
  4. Transfer items from your vault to your character and equip them.

In this section I’m going to lay the foundations of a full inventory management system. This section will detail how to read the list of items that are currently equipped on your character and how to read the contents of your vault.

I’ll also show how, by downloading the full Destiny Manifest, we can speed up our code and drastically reduce the amount of HTTP requests we need to send to Bungie.

Also, I like to say a big thanks to all of the people on Reddit, Twitter and Bungie.net for their help and interest 🙂

You can find me on Twitter here @Allyn_H_

The getCharacterInventory endpoint:

The BungieNetPlatform wiki for this endpoint can be found here. This endpoint will return a JSON response with a list of each of the equipped items for a given character ID. It works in a similar way to the Xur adviser endpoint (seen in parts 1 & 2). The request will return a series of encoded hashes, which we’ll then need to send as a series of separate requests to query the Destiny manifest, in order to decrypt this data into a human readable format.

  • This is a public endpoint – no need to be logged in, anyone can use this.
  • This a HTTP GET request – you don’t need to send a JSON package.
  • Your destinyMembershipId needs to be attached to the URL as a parameter – more info here.
  • Your characterId needs to be attached to the URL as a parameter – more info here.

Making the request:

First we build the endpoint URL and make the GET request:

base_url = "https://www.bungie.net/platform/Destiny/"
req_string = base_url + membershipType + "/Account/" + destinyMembershipId + "/Character/" + charId + "/Inventory"
res = session.get(req_string)

The request URL will look like this:

https://www.bungie.net/platform/Destiny/2/Account/4611686018436136301/Character/2305843009222916165/Inventory/

Parsing the response:

Next we need to parse through the multidimensional JSON response and pick out the relevant data:

for equipment in res.json()['Response']['data']['buckets']['Equippable']:
    for item in equipment['items']:
        print "itemHash is: \t\t", item['itemHash']
        print "itemInstanceId is: \t", item['itemInstanceId']
        hashReqString = base_url + "Manifest/6/" + str(item['itemHash'])
        res2 = requests.get(hashReqString, headers=HEADERS)
        item_name = res2.json()['Response']['data']['inventoryItem']['itemName']
        item_tier = res2.json()['Response']['data']['inventoryItem']['tierTypeName']
        item_type = res2.json()['Response']['data']['inventoryItem']['itemTypeName']
        item_icon = res2.json()['Response']['data']['inventoryItem']['icon']
        print "Item name is: " + item_name
        print "Item type is: " + item_tier + " " + item_type
        print "Item icon is: http://www.bungie.net" + item_icon

The above code will read through the large JSON response and print out the data we want. For each item in our inventory, we will print out the items name, its type, the tier type (common, legendary or exotic) its description and its icon. The dict item [‘Equippable’] is an array, so we need to loop through each piece of “equipment” and pick out each of the [‘items’] stored there.

The code will return something like this:

itemHash is: 1703777169
itemInstanceId is: 6917529081613012276
Item name is: 1000-Yard Stare
Item type is: Legendary Sniper Rifle
Item icon is: http://www.bungie.net/common/destiny_content/icons/c1c49acd0fd146d7b32184f23c64dfe5.jpg

The GetVault endpoint:

The BungieNetPlatform wiki for this endpoint can be found here. This endpoint also works in a similar way to the Xur adviser endpoint (seen in parts 1 & 2) and the getCharacterInventory endpoint, it will return a series of encoded hashes, which we’ll then need to send as a series of separate requests to query the Destiny manifest. If our vault was full, we currently have space for 288 items. This means we’ll need to make 289 (1 request to the vault + 288 requests to decrypt items ) requests to the Bungie servers to get all the data we want! Making that many requests takes way too much time and eats way too much data, I’ll explain how to remove these requests in a bit 🙂

  • This is a private endpoint – you need to be logged in, with a persistent HTTP session.
  • This is a GET request – you don’t need to send a JSON package.
  • Your destinyMembershipId needs to be attached to the URL as a parameter – more info here.

Making the request:

The code for this request will look like this:

getVault_url = base_url + membershipType + "/MyAccount/Vault/"
res = session.get(getVault_url, params={'accountId': destinyMembershipId})

Parsing the response:

Next we need to parse through the multidimensional JSON response and pick out the relevant data:

for bucket in vaultResult.json()['Response']['data']['buckets']:
    for item in bucket['items']:
        print item['itemHash']
        print item['itemInstanceId']
        hashReqString = base_url + "Manifest/6/" + str(item['itemHash'])
        res2 = requests.get(hashReqString, headers=HEADERS)
        myItem = item['itemHash']
        item_name = res2.json()['Response']['data']['inventoryItem']['itemName']
        item_tier = res2.json()['Response']['data']['inventoryItem']['tierTypeName']
        item_type = res2.json()['Response']['data']['inventoryItem']['itemTypeName']
        item_icon = res2.json()['Response']['data']['inventoryItem']['icon']
        print "Item name is: " + item_name
        print "Item type is: " + item_tier + " " + item_type
        print "Item icon is: http://www.bungie.net" + item_icon

The above code will read through the large JSON response and print out the data we want. For each item in our inventory, we will print out the items name, its type, the tier type (common, legendary or exotic) its description and its icon. The dict item [‘buckets’] is an array, so we need to loop through each “bucket” and pick out each of the [‘items’] stored there. The [‘buckets’] item is a storage category in your vault, such as Armour, Weapons, Consumables, Shaders, Emblems, etc. Each of the [‘items’] refers to the item stored in that vault space.

It’s also important to remember, we need to collect the “itemInstanceId” for each item, as this is required to transfer the item to and from the vault.

hashReqString = base_url + "Manifest/6/" + str(item['itemHash'])
res2 = requests.get(hashReqString, headers=HEADERS)

The above lines makes each of the requests to the manifest to decrypt the “itemsHash” into a human readable format. This request is made for each of the items in our vault. This is the real time consuming part of the program.

Downloading the manifest:

So we’ve figured out how to find our vault contents and list the equipped items on our characters. We’ve also seen, for decrypting and reading the contents of our vault, we need to make 289 requests (more if Bungie ever decide to increase the vault capacity). This takes a huge chunk of time. I measured the amount of time it took to read my inventory and to send 267 requests (I had 266 items in my inventory), the script took 2 mins and 43 seconds to finish!

So in order to speed things up, I downloaded the Destiny Manifest database onto my local computer, the code and directions can be found here on the destiny devs page.

The code can be downloaded here.

Once downloaded you can execute the code like so:

python manifest_destiny.py

The output looks something like this:

Output of the script used to download the Destiny Manifest.

Output of the script used to download the Destiny Manifest.

This will download the full Manifest and store it in a Pickle file. This is also my first time using Pickle files, so I assume this is done as it’s easier to get the data from a Pickle file than an SQL database.

Here’s what they look like:

Copies of the Destiny Manifest and the Manifest as a Pickle file.

Copies of the Destiny Manifest and the Manifest as a Pickle file.

The files are 53MB and 71MB, so they’re big enough. The file sizes can be reduced by commenting out any of the Manifest items you don’t need – for example I downloaded second copy of the Manifest with only the “itemHash” data included.

Here’s how I edited the code:

Changing the code to download only the itemHash data.

Changing the code to download only the itemHash data.

Here are the file sizes:

Copies of the itemHash manifest and Pickle file.

Copies of the itemHash manifest and Pickle file.

We can see here, the Pickle file we need has reduced from 71MB to 37MB. This would also further reduce the run time of our programs. As memory isn’t really an issue for me I will be sticking to the full Manifest.

Reading the vault contents using the manifest:

Now that we have saved the Manifest as a Pickle file, we can pull the required data directly from this file instead of needing to make hundreds of HTTP requests every time we want to read our inventory or our vault contents.

To import the data from our Pickle file we use the following code:

import pickle

with open('manifest.pickle', 'rb') as data:
    all_data = pickle.load(data)

Now the object all_data will contain the entire Manifest contents.

Previously we made requests to the Manifest via a HTTP request:

hashReqString = base_url + "Manifest/6/" + str(item['itemHash'])
res2 = requests.get(hashReqString, headers=HEADERS)
myItem = item['itemHash']
item_name = res2.json()['Response']['data']['inventoryItem']['itemName']

Where we pass the parameter “6”, which refers to the definition type “InventoryItem” and the specific “itemHash”.
Now we can read the item information from the nested dict object stored in all_data:

inventoryItem = all_data['DestinyInventoryItemDefinition'][item['itemHash']]
item_name = inventoryItem['itemName']

Updating the code to pull from our local Manifest copy:

Here’s how I modified the code used to parse our vault to pull the data directly from our manifest.pickle file instead of making a HTTP request:

for bucket in vaultResult.json()['Response']['data']['buckets']:
    for item in bucket['items']:
        weapon_list[array_size]['itemReferenceHash'] = item['itemHash']
        weapon_list[array_size]['itemId'] = item['itemInstanceId']
        inventoryItem = all_data['DestinyInventoryItemDefinition'][item['itemHash']]
        item_name = inventoryItem['itemName']
        item_tier = inventoryItem['tierTypeName']
        item_type = inventoryItem['itemTypeName']
        item_icon = inventoryItem['icon']
        print "Item name is: " + item_name
        print "Item type is: " + item_tier + " " + item_type
        print "Item icon is: http://www.bungie.net" + item_icon

Run time improvements:

We can see from the table below that downloading the Manifest and removing the need for ~290 additional HTTP requests can drop the run time from 220 seconds down to 16 seconds. A lot of 16 seconds seems to be from parsing the Pickle file, which again could be optimised by either using the raw SQL data or storing it in some other format. For now though the convenience of using the Pickle file far out weighs the downsides.

Manifest Type:

Run time:

Multiple HTTP requests to Bungie Manifest 220.0 seconds
manifest.pickle – full download 22.3 seconds
item.pickle – only item 16.2 seconds

Viewing the vault contents as a HTML file:

As a quick and easy way of outputting the vault contents into a HTML file, I used the same template file and code format as used in the guide for emailing Xurs inventory.

Here are the steps:

  • Append the HTML header to the top of the my_html string.
  • Modify the code used to parse the vault contents to output a HTML formatted string.
  • Append the HTML footer to the bottom of the my_html string.
  • Concatenate the header, vault contents and footer HTML strings into the my_html string and output this to a file called vault_contents.html.

Parsing the response as HTML:

for bucket in vaultResult.json()['Response']['data']['buckets']:
    for item in bucket['items']:
        inventoryItem = all_data['DestinyInventoryItemDefinition'][item['itemHash']]
        item_name = inventoryItem['itemName']
        item_tier = inventoryItem['tierTypeName']
        item_type = inventoryItem['itemTypeName']
        item_icon = "http://www.bungie.net/" + inventoryItem['icon']
        print "Item name is: " + item_name
        array_size += 1
        print "Item is: " + item_name
        print "Item type is: " + item_tier + " " + item_type + "\n"
        my_html = my_html + "\t\t<div class=\"col-md-4\">\n"
        my_html = my_html + "\t\t\t<div class=\"thumbnail\">\n"
        my_html = my_html + "\t\t\t\t<a href=\"" + item_icon + "\">\n"
        my_html = my_html + "\t\t\t\t<img src=\"" + item_icon + "\">\n"
        my_html = my_html + "\t\t\t\t</a>\n"
        my_html = my_html + "\t\t\t\t<h3>" + item_name + "</h3>\n"
        my_html = my_html + "\t\t\t\t<p>" + item_tier + " " + item_type + "</p>\n"
        my_html = my_html + "\t\t\t</div>\n"
        my_html = my_html + "\t\t</div>\n"

Here’s what the output of the code looks like:

Even have the VoG boots. #Y1RaidStruggles

Vault contents as formatted HTML.

Running the code:

The full set of Python code can also be found on my GitHub page here: https://github.com/AllynH/Destiny_Read_Characters_Inventory_Vault

As there are a few files you’ll need to copy it from there,. In order to make the code work for you – you’ll need to input your username, password, api_key, destinyMembershipId and characterId in the Header_file.py.

Here are the values you’ll need to change:

# PSN Username:
username = emailaddr
password = mypassword

# Destiny API X-Key:
API_KEY = ""

# Destiny parameters:
membershipType = "2" # PS4 = 2
destinyMembershipId = ""

characterId = ""

You can then run the code like so:

> python Read_Inventory_and_Vault.py

As always, I’ll try to keep the GitHub repo up to date with any changes I make.

Next steps:

The code as it is, does what we want, reads the contents of our vault and prints it to a HTML file, but it’s not really a fully functioning inventory management system.

To create a fully working web application the code will need to be built into a web framework, luckily for me I’ve been reading up on the Python Flask web framework for the last few weeks 🙂

Creating a Python app for Destiny – Part 4: Transferring and equipping items.

Introduction:

In the previous sections I showed how to:

  1. Send a request to read Xurs inventory.
  2. Send a HTML formatted email with Xurs inventory.
  3. Log into Destiny and Bungie.net via your PSN account.

I’m now going to build on the previously created code to create an app that can transfer an item to and from the vault, once the item has been transferred – we can then equip it.

If you’ve been reading along with me up to this point, you should understand all of this, if not – now is a good time to check out my previous posts.

Here’s the code working (sorry about the camera shaking, it was made using my phone ):

Destiny the game, Equipping an Item with the Destiny API. from Allyn H on Vimeo.

Also, I like to say a big thanks to all of the people on Reddit, Twitter and Bungie.net for their help and interest 🙂

You can find me on Twitter here @Allyn_H_

The transferItem endpoint:

The Destiny API uses web server endpoints to execute commands, the endpoint used to transfer items is: https://www.bungie.net/platform/Destiny/transferItem/

The BungieNetPlatform wiki for this endpoint can be found here. We can see this method is very different from the Xur advisors endpoint we used in the previous examples.

Here are the main differences:

  • We still need to use the X-API key in our header.
  • We also have to attach our x-csrf token in our header.
  • This is a private endpoint – we need to be logged in to use this method.
    • We need to attach the required cookies to our POST request.
  • This is a POST method – we will need to send data with this request.
    • We need to send the data as a JSON packet.
  • The response will only tell us if we have been successful or not – the Xur advisors response was >1000 lines of text.

The code for storing the transferItem endpoint variable will look like this:

base_url = "https://www.bungie.net/platform/Destiny/"
req_string = base_url + "TransferItem/"

Important note: If the trailing “/” is missing from the “transferItem/” endpoint – the request will fail. I’ve spent way longer than I’m willing to admit debugging that issue :/

Creating the POST request payload:

The “transferItem” endpoint takes a JSON payload as an input. This JSON payload contains a lot of information. So lets take a look at how it’s put together.

The data gathered to populate this payload was found by looking at items I had equipped using the “GetCharacterInventory” endpoint, you can find more info here. This will list the relevant information for all of your equipped items, let me know if you want to see the code for this.

Name

Description

membershipType

This will be 2 for PSN and 1 for Xbox live.

itemReferenceHash

This is the “itemHash” number from our previous examples. This refers to the generic item for example: 1000-Yard Stare.

itemId

This is a unique item number relating to a specific item, that you own. For example: Allyn’s 385 light level 1000-Yard Stare with the Ambush Scope, Quickdraw and Firefly.

stackSize

How many items to transfer. Should be “1” for equipable items.

characterId

The characterId the item is being moved to or from. You can find the characterId here.

transferToVault

Move the item to or from the vault; true or false

Here’s how this method expects the JSON data to be formatted:

This is the BungieNetPlatform example for the transferItem POST payload

This is the BungieNetPlatform example for the transferItem POST payload

The good news here is it’s pretty easy to put this together in Python by creating a Dictionary object.

Making a POST request:

We’ve already looked at making HTTP GET requests with Python, but the Python Requests library also makes it really easy to make a POST request.

You can view the Requests documentation here. Here’s the quick example of how to send some data formatted as a Python dictionary:

Python Requests POST example

Python Requests POST example

Or the Requests library will also automatically encode a Python dict into a JSON object for you, when you use the “json” parameter:

Python Requests POST JSON example

Python Requests POST JSON example

Now that we know how to create a Python POST request and what data the transferItem endpoint are looking for, lets turn that into some code!

Moving an item from your vault to your inventory:

As mentioned above – the data gathered to populate this payload was found by looking at items I had equipped by using the “GetCharacterInventory” endpoint, you can find more info here. I’m planning on putting together a blog post on this too but let me know if you’d like to see the code for this.

First we create our Python dictionary object:

text_payload = {
    "membershipType": 2,
    "itemReferenceHash": 1519376148, # The Ram
    "itemId": 6917529085991104887, # The Ram
    "characterId": characterId_Warlock,
    "stackSize": 1,
    "transferToVault": False
}

Next, we convert the dictionary object into a JSON object:

payload = json.dumps(text_payload)

Now we can make the actual POST request to the Destiny servers to transfer our item.

base_url = "https://www.bungie.net/platform/Destiny/"
req_string = base_url + "TransferItem/"
res = session.post(req_string, data=payload)

The JSON response should look like this:

{
    "ThrottleSeconds": 0, 
    "ErrorCode": 1, 
    "ErrorStatus": "Success", 
    "Message": "Ok", 
    "Response": 0, 
    "MessageData": {}
}

If the item was not found in your vault, you’ll get a response like this:

{
    "ThrottleSeconds": 0, 
    "ErrorCode": 1623, 
    "ErrorStatus": "DestinyItemNotFound", 
    "Message": "The item requested was not found.", 
    "Response": 0, 
    "MessageData": {}
}

For any other issues, for example if you were to omit the trailing “/” from “https://www.bungie.net/platform/Destiny/TransferItem/” URL, the request would fail but you wouldn’t receive a JSON response, instead your code would error and you’d receive a HTTP 405 status code. You can find the status code by using the following code:

print res.status_code
405

If you see something like that coming up, you’ll need to review and fix your code.

Equipping an item from your inventory:

Now that the item is in our characters inventory – we can equip the item on that character.

The endpoint used to equip an item from your inventory is: https://www.bungie.net/platform/Destiny/EquipItem/ you can find more information about this endpoint here.

Just a note: this can only be done when in a social space or in orbit (otherwise it’d be really easy to cheat in PvP – can you imagine if you could do this in Trials???). Otherwise the action will fail and return an error message.

Creating the POST request payload:

The POST request to equip an item is smaller and we already have the details of the item we want to equip.

text_equip_payload = {
    "membershipType": 2,
    "itemId": 6917529085991104887, # The Ram
    "characterId": characterId_Warlock
}
equip_payload = json.dumps(text_equip_payload)

 

equip_url = base_url + "EquipItem/"
res = session.post(equip_url, data=equip_payload)

Again, if the item was not found, you’ll get a response like this:

{
    "ThrottleSeconds": 0, 
    "ErrorCode": 1623, 
    "ErrorStatus": "DestinyItemNotFound", 
    "Message": "The item requested was not found.", 
    "Response": 0, 
    "MessageData": {}
}

If you’re trying to equip an exotic and you already have one equipped, you’ll see an error message like this:

{
    "ThrottleSeconds": 0, 
    "ErrorCode": 1641, 
    "ErrorStatus": "DestinyItemUniqueEquipRestricted", 
    "Message": "You can only have one item of this type equipped.", 
    "Response": 0, 
    "MessageData": {}
}

Running the code:

Here is the full set of Python code, this can be copied into a file called “equipItem.py” and executed from the command prompt like so:

> python equipItem.py

The code can also be found on my GitHub page here: https://github.com/AllynH/Destiny_Equip_Item

As always, I’ll try to keep the GitHub repo up to date with any changes I make.

Making a Twitter-Bot on your Galileo or Raspberry Pi

Twitter have developed an API (Application Programming Interface) for their website, which makes it really easy to send and receive Tweets from your Raspberry Pi or Galileo! The Twitter API takes all of the hard work out of writing a program to interface with Twitter, There are several ways to access the Twitter API, the easiest of which (in my opinion anyway 🙂 is to use the Twython package of the Python language.

Differences between Raspberry Pi and the Galileo:

It’s really easy to install Twython on the Raspberry Pi, a little harder to install on the Galileo – so for that reason, I’ll show the step-by-step instructions from the Galileo install. The only difference is the Galileo doesn’t require you to use the sudo command as you already have root permissions set. For example, when editing a file with the Galileo you would use:

# nano my_file.txt

When editing the file on a Raspberry Pi you would use:

# sudo nano my_file.txt

Changing the date on the Galileo:

In order to install some of these packages, you’ll need to update the date and time of your Galileo – this isn’t automatically done when you connect to the internet as you may expect. If you don’t update your date and time, you’ll get SSL certification errors when you try to download the Twython package.

To change the date and time – use the following command in the format year-month-day hour:minute:second:

# date --set="2015-01-20 10:00:00"

Installing Twython (and other Python packages):

In order to connect our computer to Twitter, we’ll need to download some Python packages:

  1. setuptools – this will allow you to “Easily download, build, install, upgrade, and uninstall Python packages.”
  2. pip – this is a  Python package installer.
  3. twython – this is the package which will actually interface with Twitter.

Here are the commands to install these packages – remember if you’re on a Raspberry Pi you’ll need to put “sudo” in front of each command.

# apt-get install python-setuptools

Here’s what that looks like on your computer, when asked “do you want to continue” as per the picture below – enter “y”. to continue.

Package 1 of 3.

Installing the setuptools Python package.

Next, install the “pip” package:

# easy_install pip

Finally, using pip – install the Twython package:

# pip install twython

 

Creating a Twitter App:

To create a Twitter App, you’ll need to sign up to Twitter and register the account as an application. This is important as you’ll need to verify this App with Twitter every time you use it. The verification method Twitter uses is called OAuth 2.0 to verify your App, this means you’ll never have to supply your password to 3rd party App developers but it does make it a little harder to verify your App – the good news is, Twython and other API’s handle all the OAuth pain, all you need to do is register your App and save the information. Register your app here: https://apps.twitter.com/ Click on “Create New App” and enter your information.

Let the fun begin!

Creating an App.

 

Changing App permissions:

As this is your App and you’ll want to be able to play around with it – you can chance the App permissions to allow you to read, write and access your direct messages. You can change these permissions at a later stage.

Change permissions.

Change permissions.

Authorize your account:

You need to create your access token and access token secret before you use your App. Click on the “Keys and access tokens” tab. Click on the “Create my access token” button.

Generate your secret token.

Generate your secret token.

You should now have all 4 pieces of required information:

  1. Consumer Key (API key).
  2. Consumer Secret (API Secret).
  3. Access Token
  4. Access Token Secret.

Now it’s time to write some Python!

Writing the Python:

Creating a Python file:

On your Raspberry Pi or Galileo, create a file called “Tweet.py” using the following command:

# nano Tweet.py

Now paste in the Python code:

Twitter Authorization:

In order to send  a Tweet, you’ll need to send Twitter your OAuth information. This process is handled by the Twython package. Here we are creating 4 string objects and a Twython object called “twitter”. When we create the Twython object we are passing the 4 strings to it an arguments. These are the access keys you generated in the previous section.

CONSUMER_KEY = '<YOUR CONSUMER_KEY>'
CONSUMER_SECRET = '<YOUR CONSUMER_SECRET>'
ACCESS_KEY = '<YOUR ACCESS_KEY>'
ACCESS_SECRET = '<YOUR ACCESS_SECRET>'
twitter = Twython(CONSUMER_KEY,CONSUMER_SECRET,ACCESS_KEY,ACCESS_SECRET)

Reading in a command line argument:

To make the code a bit more flexible, we can pass the text we want to Tweet into the Python script as a command line argument. This is done by using the system “argv” parameter. In our case – we only want to take the first 140 characters of this text, as this is the character limit set by Twitter for each Tweet. We do that by using the command:

sys.argv[1][:140]

Executing the code:

You can execute the code from the command line like so:

# python Tweet.py "Hello world."

Here’s what that looks like on the Galileo:

First Tweet!

Hello World!

Then check your Twitter Bot!

© 2024 Allyn H

Theme by Anders NorenUp ↑