Skip to content

Commit

Permalink
Rewriting to use asyncio with python3.
Browse files Browse the repository at this point in the history
Also refactoring. Also supporting reload messages, etc.
  • Loading branch information
lakinwecker committed Jun 24, 2017
1 parent 2bcc682 commit 71ab137
Show file tree
Hide file tree
Showing 3 changed files with 464 additions and 371 deletions.
342 changes: 276 additions & 66 deletions lichess.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,83 +15,65 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

__all__ = [
"Lichess",
"LoginError",
"StudyConnectionError"
"move_to_path_id",
]

import aiohttp
import asyncio
import json
import random
import pprint
import string
import sys
import time

from collections import defaultdict


# Helpers for making lichess style messages
from chess import PIECE_SYMBOLS, SQUARES, square_file, square_rank

def add_study_chapter_message(name, pgn=None):
return {
"t":"addChapter",
"d":{
"name": name,
"game":None,
"variant":"Automatic",
"fen":None,
"pgn": pgn.strip(),
"orientation":"white",
"mode":"normal",
"initial":False,
"sticky": False
}
}

#-------------------------------------------------------------------------------
# Lichess errors
#-------------------------------------------------------------------------------
class LoginError(RuntimeError):
pass

class StudyConnectionError(RuntimeError):
pass

class StudyNotAContributor(RuntimeError):
pass

#-------------------------------------------------------------------------------
# Some useful constants
#-------------------------------------------------------------------------------
STAGING_DOMAIN = "listage.ovh"
STAGING_URL = "https://{}/".format(STAGING_DOMAIN)
STAGING_WS_URL = "wss://socket.{}".format(STAGING_DOMAIN)

LIVE_DOMAIN = "lichess.org"
LIVE_URL = "https://{}/".format(LIVE_DOMAIN)
LIVE_WS_URL = "wss://socket.{}".format(LIVE_DOMAIN)

#-------------------------------------------------------------------------------
# Converting raw data into dictionaries suitable for the lichess protocol
#-------------------------------------------------------------------------------
def clock_from_comment(comment):
if not "[%clk" in comment:
return None
comment = comment.replace("[%clk ", "")
comment = comment.replace("]", "")
return comment.strip()

promotion_lookup = {
"q": "queen",
"r": "rook",
"b": "bishop",
"n": "knight",
"k": "king",
}
def add_move_to_study(new_node, old_node, chapter_id, path):
uci = old_node.board().uci(new_node.move, chess960=True)
move = {
"t":"anaMove",
"d":{
"orig": uci[:2],
"dest": uci[2:4],
"fen": old_node.board().fen(),
"path": path,
"ch": chapter_id,
"sticky": False,
"promote": True,
}
}
if len(uci) == 5:
move["d"]["promotion"] = promotion_lookup[uci[4]]
clock = clock_from_comment(new_node.comment)
if clock:
move["d"]["clock"] = "{}".format(clock)
return move


#{"t":"setTag","d":{"chapterId":"NNgEcycT","name":"Result","value":"1/2-1/2"}}
def set_tag(chapter_id, tag_name, tag_value):
return {
"t": "setTag",
"d": {
"chapterId": chapter_id,
"name": tag_name,
"value": tag_value,
}
}

def set_comment(chapter_id, path, comment):
return {
"t": "setComment",
"d": {
"ch": chapter_id,
"path": path,
"text": comment
}
}

#-------------------------------------------------------------------------------
# Hashing uci moves into 2 character path names in the lichess study style.
#-------------------------------------------------------------------------------
def hash_code(pos):
return 8 * square_rank(pos) + square_file(pos)

Expand Down Expand Up @@ -168,7 +150,235 @@ def move_to_path_id(move):
to_char(move.to_square)
)
else:
return "" # TODO: what to do for null moves?
raise NotImplementedError("We don't have an implementation for null moves")

headers = {
'Accept': 'application/vnd.lichess.v2+json',
}
#-------------------------------------------------------------------------------
# The objects used to interact with the study.
#-------------------------------------------------------------------------------
class Study:
def __init__(self, lichess, study_id):
self.lichess = lichess
self.study_id = study_id
self.study_path = "study/{}".format(study_id)
self.study_url = self.lichess.url(self.study_path)
self.study_data = {}
self._chapters = {}
self._chapter_versions = defaultdict(int)
self.domain = None
self.sri = "".join([random.choice(string.ascii_letters) for x in range(10)])
self.websocket_url = "wss://socket.{}/{}/socket/v2?sri={}".format(
self.lichess.domain,
self.study_path,
self.sri
)
self.websocket = None
self.websocket_connected = asyncio.Future()
self.should_stop = False

async def connect(self):
await self.sync()
asyncio.ensure_future(self.connect_to_websocket())
await self.websocket_connected

async def connect_to_websocket(self):
async with self.lichess.session.ws_connect(self.websocket_url, headers=headers) as websocket:
self.websocket = websocket
self.websocket_connected.set_result(self.websocket)
ping_future = asyncio.ensure_future(self._ping())
async for msg in websocket:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['t'] == 'addChapter':
# We got a new chapter, we should sync it
await self.sync_chapter(data['d']['p']['chapterId'])
elif data['t'] == 'reload':
chapter_id = data['d'].get('chapterId')
if chapter_id:
await self.sync_chapter(chapter_id)
else:
await self.sync()
print("<- [RECEIVE]: {}".format(msg.data))
elif msg.type == aiohttp.WSMsgType.CLOSED:
print("Lost connection, disconnecting")
self.should_stop = True
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print("Error, disconnecting")
self.should_stop = True
break
await ping_future

async def _ping(self):
while True:
await self.send({"t": "p"})
await asyncio.sleep(1)
if self.should_stop:
break

# TODO: There has to be a better way to accomplish this O_o
async def send(self, data):
if not self.websocket:
raise RuntimeError("Sending without a websocket!!")
msg_str = json.dumps(data)
print("-> [SENDING]: {}".format(msg_str))
await self.websocket.send_str(msg_str)

def ensure_contributor(self):
members = self.study_data['study']['members']
if self.lichess.username not in members or members[self.lichess.username]['role'] != 'w':
raise StudyNotAContributor("The user must be a contributor to the study")

async def sync(self):
response = await self.lichess.session.get(self.study_url, headers=headers)
if response.status != 200:
raise StudyConnectionError("Unable to connect to the study. {} returned {}".format(
self.study_url,
response.status
))
self.study_data = await response.json()
for chapter in self.study_data['study'].get('chapters', []):
if chapter['id'] not in self._chapters:
await self.sync_chapter(chapter['id'])

async def sync_chapter(self, chapter_id):
chapter_url = "{}/{}?_={}".format(self.study_url, chapter_id, time.time())
print("++ [SYNCING] Chapter: {}".format(chapter_id))
response = await self.lichess.session.get(chapter_url, headers=headers)
if response.status != 200:
raise StudyConnectionError("Unable to connect to the chapter. {} returned {}".format(
chapter_url,
response.status
))
chapter_data = await response.json()

# Convert the tags into a dict
tags = {}
for tag_name, tag_value in chapter_data['study']['chapter']['tags']:
tags[tag_name] = tag_value

chapter_data['tags'] = tags
chapter_data['id'] = chapter_id

self._chapter_versions[chapter_id] = self._chapter_versions[chapter_id] + 1
chapter_data['version'] = self._chapter_versions[chapter_id]
self._chapters[chapter_id] = chapter_data

def get_chapters(self):
return self._chapters.values()

def get_chapter(self, id):
return self._chapters[id]

async def create_chapter_from_pgn(self, pgn):
await self.send({
"t":"addChapter",
"d":{
"name": "Chapter 1", # NOTE: this + pgn causes the chapter to be autonamed.
"game":None,
"variant":"Automatic",
"fen":None,
"pgn": pgn.strip(),
"orientation":"white",
"mode":"normal",
"initial":False,
"sticky": False
}
})

async def add_move(self, chapter_id, path,new_node, old_node):
promotion_lookup = {
"q": "queen",
"r": "rook",
"b": "bishop",
"n": "knight",
"k": "king",
}
uci = old_node.board().uci(new_node.move, chess960=True)
move = {
"t":"anaMove",
"d":{
"orig": uci[:2],
"dest": uci[2:4],
"fen": old_node.board().fen(),
"path": path,
"ch": chapter_id,
"sticky": False,
"promote": True,
}
}
if len(uci) == 5:
move["d"]["promotion"] = promotion_lookup[uci[4]]
clock = clock_from_comment(new_node.comment)
if clock:
move["d"]["clock"] = "{}".format(clock)
await self.send(move)

#{"t":"setTag","d":{"chapterId":"NNgEcycT","name":"Result","value":"1/2-1/2"}}
async def set_tag(self, chapter_id, tag_name, tag_value):
await self.send({
"t": "setTag",
"d": {
"chapterId": chapter_id,
"name": tag_name,
"value": tag_value,
}
})

async def set_comment(self, chapter_id, path, comment):
await self.send({
"t": "setComment",
"d": {
"ch": chapter_id,
"path": path,
"text": comment
}
})




class Lichess:
def __init__(self, loop, session, url):
if url not in [STAGING_URL, LIVE_URL]:
raise RuntimeError("{} is not one of {} or {}".format(
url,
LIVE_URL,
STAGING_URL,
))

self.loop = loop
self.base_url = url
self.domain = "lichess.org" if url == LIVE_URL else "listage.ovh"
self.session = session

def url(self, path, scheme="https"):
"""Generate a lichess url from the given path component.
This is a convenience function that makes for slightly shorter code.
"""
return "{}://{}/{}".format(scheme, self.domain, path)

async def login(self, username, password):
"""Login to lichess using the given credentials.
Raises LoginError if the login is unsuccessful
"""
self.username = username
response = await self.session.post(
self.url('login'),
headers=headers,
data={"username": username, "password": password}
)
if response.status != 200:
raise LoginError("Unable to login")

async def study(self, study_id):
study = Study(self, study_id)
await study.connect()
return study

if __name__ == "__main__":
import doctest
Expand Down
Loading

0 comments on commit 71ab137

Please sign in to comment.