-
Notifications
You must be signed in to change notification settings - Fork 235
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
732 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
#!/usr/bin/env python | ||
"""A command line application to install apps on Android-enabled Sony cameras""" | ||
|
||
import argparse | ||
import json | ||
import os | ||
|
||
from pmca import installer | ||
from pmca.api import * | ||
from pmca.usb import * | ||
from pmca.usb.driver import * | ||
from pmca.usb.sony import * | ||
|
||
def printStatus(status): | ||
"""Print progress""" | ||
print '%s %d%%' % (status.message, status.percent) | ||
|
||
def switchToAppInstaller(dev): | ||
"""Switches a camera in MTP mode to app installation mode""" | ||
print 'Switching to app install mode. Please run this command again when the camera has switched modes.' | ||
dev.switchToAppInstaller() | ||
|
||
def installApp(dev, api, apkFile=None, outFile=None): | ||
"""Installs an app on the specified device. The apk is uploaded to the specified WebApi.""" | ||
print 'Creating task' | ||
# Upload apk (if any), start task | ||
task = api.startBlobTask(api.uploadBlob(apkFile.read())) if apkFile else api.startTask() | ||
xpdData = api.getXpd(task) | ||
|
||
print 'Starting communication' | ||
# Point the camera to the web api | ||
result = installer.install(dev, xpdData, printStatus) | ||
if result.code != 0: | ||
raise Exception('Communication error %d: %s' % (result.code, result.message)) | ||
|
||
print 'Downloading task' | ||
# Get the result from the website | ||
result = api.getTask(task) | ||
if not result['completed']: | ||
raise Exception('Task was not completed') | ||
result = result['response'] | ||
|
||
print 'Task completed successfully' | ||
|
||
if outFile: | ||
print 'Writing to output file' | ||
json.dump(result, outFile, indent=2) | ||
|
||
|
||
def installCommand(url, driverName=None, apkFile=None, outFile=None): | ||
"""Install command main""" | ||
if not driverName: | ||
# On Windows, we default to wpd, user libusb for other systems | ||
driverName = 'wpd' if os.name == 'nt' else 'libusb' | ||
|
||
# Import the specified driver | ||
if driverName == 'libusb': | ||
print 'Using libusb' | ||
import pmca.usb.driver.libusb as driver | ||
elif driverName == 'wpd': | ||
print 'Using Windows Portable Device Api (WPD)' | ||
import pmca.usb.driver.wpd as driver | ||
else: | ||
raise Exception('Unknown driver') | ||
|
||
print 'Looking for Sony MTP devices' | ||
# Scan for MTP devices | ||
devices = [dev for dev in driver.listDevices() if dev.type == USB_CLASS_PTP and dev.idVendor == SONY_ID_VENDOR] | ||
|
||
if not devices: | ||
print 'No MTP devices found. Ensure your camera is connected in MTP mode.' | ||
|
||
for device in devices: | ||
print '\nQuerying %s' % device.product | ||
# Get device info | ||
drv = driver.MtpDriver(device) | ||
info = MtpDevice(drv).getDeviceInfo() | ||
|
||
if isSonyMtpCamera(info): | ||
print '%s is a camera in MTP mode' % info.model | ||
switchToAppInstaller(SonyMtpCamera(drv)) | ||
elif isSonyMtpAppInstaller(info): | ||
print '%s is a camera in app install mode' % info.model | ||
installApp(SonyMtpAppInstaller(drv), WebApi(url), apkFile, outFile) | ||
|
||
|
||
def main(): | ||
"""Command line main""" | ||
parser = argparse.ArgumentParser() | ||
subparsers = parser.add_subparsers(dest='command', title='commands') | ||
install = subparsers.add_parser('install', description='Installs an apk file on the camera connected via USB. The connection can be tested without specifying a file.') | ||
install.add_argument('-u', dest='url', help='specify the web api base url', default='https://sony-pmca.appspot.com') | ||
install.add_argument('-d', dest='driver', choices=['libusb', 'wpd'], help='specify the driver') | ||
install.add_argument('-o', dest='outFile', type=argparse.FileType('w'), help='write the output to this file') | ||
install.add_argument('-f', dest='apkFile', type=argparse.FileType('rb'), help='the apk file to install') | ||
|
||
args = parser.parse_args() | ||
if args.command == 'install': | ||
installCommand(args.url, args.driver, args.apkFile, args.outFile) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import requests | ||
|
||
class WebApi: | ||
"""A simple interface for the appengine website""" | ||
def __init__(self, baseUrl): | ||
self.base = baseUrl | ||
|
||
def uploadBlob(self, data, name='app.apk'): | ||
"""Uploads a blob, returns its key""" | ||
url = requests.get(self.base + '/ajax/upload').json()['url'] | ||
return requests.post(url, files={'file': (name, data)}).json()['key'] | ||
|
||
def startTask(self, urlSuffix=''): | ||
"""Starts a task sequence and returns its id""" | ||
return str(requests.get(self.base + '/ajax/task/start' + urlSuffix).json()['id']) | ||
|
||
def startBlobTask(self, key): | ||
"""Starts the installation task of a previously uploaded apk blob""" | ||
return self.startTask('/blob/' + key) | ||
|
||
def getXpd(self, task): | ||
"""Downloads the xpd file for a task""" | ||
return requests.get(self.base + '/camera/xpd/' + task).text.encode('ascii','ignore') | ||
|
||
def getTask(self, task): | ||
"""Returns the task result""" | ||
return requests.get(self.base + '/ajax/task/get/' + task).json() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
"""Manages the communication between camera, PC and appengine website during app installation""" | ||
|
||
from collections import namedtuple | ||
import json | ||
import select | ||
import socket | ||
|
||
from ..usb.sony import * | ||
from .. import xpd | ||
|
||
Response = namedtuple('Response', 'protocol, code, status, headers, data') | ||
Request = namedtuple('Request', 'protocol, method, url, headers, data') | ||
|
||
Status = namedtuple('Status', 'code, message, percent, totalSize') | ||
Result = namedtuple('Result', 'code, message') | ||
|
||
def _buildRequest(endpoint, contentType, data): | ||
return 'POST %s REST/1.0\r\nContent-type: %s\r\n\r\n%s' % (endpoint, contentType, data) | ||
|
||
def _parseHttp(data): | ||
headers, data = data.split('\r\n\r\n')[:2] | ||
headers = headers.split('\r\n') | ||
firstLine = headers[0] | ||
headers = dict(h.split(': ') for h in headers[1:]) | ||
return firstLine, headers, data | ||
|
||
def _parseRequest(data): | ||
firstLine, headers, data = _parseHttp(data) | ||
method, url, protocol = firstLine.split(' ', 2) | ||
return Request(protocol, method, url, headers, data) | ||
|
||
def _parseResponse(data): | ||
firstLine, headers, data = _parseHttp(data) | ||
protocol, code, status = firstLine.split(' ', 2) | ||
return Response(protocol, int(code), status, headers, data) | ||
|
||
def _parseResult(data): | ||
data = json.loads(data) | ||
return Result(data['resultCode'], data['message']) | ||
|
||
def _parseStatus(data): | ||
data = json.loads(data) | ||
return Status(data['status'], data['status text'], data['percent'], data['total size']) | ||
|
||
def install(dev, xpdData, statusFunc = None): | ||
"""Sends an xpd file to the camera, lets it access the internet through SSL, waits for the response""" | ||
# Initialize communication | ||
dev.emptyBuffer() | ||
dev.sendInit() | ||
|
||
# Start the installatin by sending the xpd data in a REST request | ||
response = dev.sendRequest(_buildRequest('/task/start', xpd.constants.mimeType, xpdData)) | ||
response = _parseResponse(response) | ||
result = _parseResult(response.data) | ||
if result.code != 0: | ||
raise Exception('Response error %s' % str(result)) | ||
|
||
connectionId = 0 | ||
sock = None | ||
|
||
# Main loop | ||
while True: | ||
if sock != None: | ||
ready = select.select([sock], [], [], 0) | ||
if ready[0]: | ||
# There is data waiting on the socket, let's send it to the camera | ||
resp = sock.recv(4096) | ||
if resp != '': | ||
dev.sendSslData(connectionId, resp) | ||
else: | ||
dev.sendSslEnd(connectionId) | ||
sock.close() | ||
sock = None | ||
|
||
# Receive the next message from the camera | ||
message = dev.receive() | ||
if message == None: | ||
# Nothing received, let's wait | ||
continue | ||
|
||
if isinstance(message, SslStartMessage): | ||
# The camera wants us to open an SSL socket | ||
connectionId = message.connectionId | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
sock.connect((message.host, 443)) | ||
elif isinstance(message, SslSendDataMessage) and sock and message.connectionId == connectionId: | ||
# The camera wants to send data over the socket | ||
sock.send(message.data) | ||
elif isinstance(message, RequestMessage): | ||
# The camera sends a REST message | ||
request = _parseRequest(message.data) | ||
if request.url == '/task/progress': | ||
# Progress | ||
status = _parseStatus(request.data) | ||
if statusFunc: | ||
statusFunc(status) | ||
elif request.url == '/task/complete': | ||
# The camera completed the task, let's stop this loop | ||
result = _parseResult(request.data) | ||
dev.sendEnd() | ||
return result | ||
else: | ||
raise Exception("Unknown message url %s" % request.url) | ||
else: | ||
raise Exception("Unknown message %s" % str(message)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from collections import namedtuple | ||
|
||
from ..util import * | ||
|
||
DeviceInfo = namedtuple('DeviceInfo', 'manufacturer, model, serialNumber, operationsSupported, vendorExtension') | ||
|
||
class MtpDevice: | ||
"""Manages communication with a PTP/MTP device. Inspired by libptp2""" | ||
PTP_OC_GetDeviceInfo = 0x1001 | ||
PTP_OC_OpenSession = 0x1002 | ||
PTP_OC_CloseSession = 0x1003 | ||
PTP_RC_OK = 0x2001 | ||
PTP_RC_SessionNotOpen = 0x2003 | ||
PTP_RC_SessionAlreadyOpened = 0x201E | ||
|
||
def __init__(self, driver): | ||
self.driver = driver | ||
self.openSession() | ||
|
||
def _checkResponse(self, code, acceptedCodes=[]): | ||
if code not in [self.PTP_RC_OK] + acceptedCodes: | ||
raise Exception('Response code not OK: 0x%x' % code) | ||
|
||
def _parseString(self, data, offset): | ||
length = parse8(data[offset:offset+1]) | ||
offset += 1 | ||
end = offset + 2*length | ||
return end, data[offset:end].decode('utf16')[:-1] | ||
|
||
def _parseIntArray(self, data, offset): | ||
length = parse32le(data[offset:offset+4]) | ||
offset += 4 | ||
end = offset + 2*length | ||
return end, [parse16le(data[o:o+2]) for o in range(offset, end, 2)] | ||
|
||
def _parseDeviceInfo(self, data): | ||
offset = 8 | ||
offset, vendorExtension = self._parseString(data, offset) | ||
offset += 2 | ||
|
||
offset, operationsSupported = self._parseIntArray(data, offset) | ||
offset, eventsSupported = self._parseIntArray(data, offset) | ||
offset, devicePropertiesSupported = self._parseIntArray(data, offset) | ||
offset, captureFormats = self._parseIntArray(data, offset) | ||
offset, imageFormats = self._parseIntArray(data, offset) | ||
|
||
offset, manufacturer = self._parseString(data, offset) | ||
offset, model = self._parseString(data, offset) | ||
offset, version = self._parseString(data, offset) | ||
offset, serial = self._parseString(data, offset) | ||
|
||
return DeviceInfo(manufacturer, model, serial, set(operationsSupported), vendorExtension) | ||
|
||
def openSession(self, id=1): | ||
"""Opens a new MTP session""" | ||
response = self.driver.sendCommand(self.PTP_OC_OpenSession, [id]) | ||
self._checkResponse(response, [self.PTP_RC_SessionAlreadyOpened]) | ||
|
||
def closeSession(self): | ||
"""Closes the current session""" | ||
response = self.driver.sendCommand(self.PTP_OC_CloseSession, []) | ||
self._checkResponse(response, [self.PTP_RC_SessionNotOpen]) | ||
|
||
def getDeviceInfo(self): | ||
"""Gets and parses device information""" | ||
response, data = self.driver.sendReadCommand(self.PTP_OC_GetDeviceInfo, []) | ||
self._checkResponse(response) | ||
return self._parseDeviceInfo(data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from collections import namedtuple | ||
|
||
USB_CLASS_PTP = 6 | ||
|
||
UsbDevice = namedtuple('UsbDevice', 'handle, idVendor, idProduct, type, manufacturer, product') |
Oops, something went wrong.
658225b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice and useful script to install an APK without a browser.
I tried this on my Macbook Pro running OSX 10.10. I installed pyusb and libusb backend.
During the install, I seem to get the following error though. Any suggestions ? Thanks!
658225b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thx for trying on a mac!
The script tries to enumerate all your USB devices. Apparently, there are some devices which don't like to reveal their manufacturer / product string (?). Since we don't really need this anyway, try replacing line 12 in pmca/usb/driver/libusb.py by this:
yield UsbDevice(dev, dev.idVendor, dev.idProduct, interface.bInterfaceClass, None, None)
658225b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, now I was able to get past that error. I actually modified
listDevices
function as follows:It does detect my ILCE-6000 but fails when writing any data to the USB device node and even sudo does not help:
On some searching, I seem to hit this know bug with libusb on OSX that could be one possible reason: http://www.libusb.org/ticket/89
This was mentioned here as well: codedance/Retaliation#20
I'll try this on my Linux box sometime, I'm sure libusb is better supported there and the right udev rules should be sufficient on a linux box afaik.
658225b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess some system driver claimed the interface. I don't think it's an HID driver though. Can you run
kextstat
before and after plugging in the camera to see which driver is added?See https://together.jolla.com/question/282/usb-file-transfer-support-for-mac-os-x/?answer=41036#post-id-41036
and https://forum.parallels.com/threads/possibly-a-clean-solution-for-usb-device-in-use-problem.5403/
658225b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kextstat
shows the same output both before and after. I even captured the output by rebooting my Mac and the before/after does not seem to differ.I also tries unloading few USBCDC drivers but did not help much. I guess I could try unloading drivers one by one and see which one is the culprit.
658225b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try
killall PTPCamera
after connecting the camera?