Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

gaspect/PyWars

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PyWars

A public topic consumer for Chat Wars ... with steroids

🚀 Motivations

  • 🤖 Build a bot?
  • 👥 Help comunity?
  • 🚀 Personal training?

☄ Quick start

from PyWars import *

app = Client()

@app.agent(Deal)
async def deals(stream:Stream[Deal]):
    async for deal in stream:
        print(deal)

app.run()

📚 Overview

Dummy client creation

from PyWars import Client
app = Client()

This will create a client with an autogenerated id for chat wars v2 api


Specifiying client version

from PyWars import Client
app = Client(version=Client.Version.CW3)

This will create a client for chat wars v3 api


Adding agents

from PyWars import *
app = Client()

@app.agent(Deal)
async def deals(stream:Stream[Deal]):
    async for deal in stream:
        print(deal)

app.run()

The Client.agent method recieves an allowed record. The allowed records are:

  • Deal for cw*-deals topic
  • Duel for cw*-duels topic
  • Offer for cw*-offers topic
  • SexDigest for cw*-sex_digest topic
  • YellowPage for cw*-yellow_pages topic
  • AuctionDigest for cw*-au_digest topic

Adding timers

from PyWars import *
app = Client()
procesed_deals = 0

@app.agent(Deal)
async def deals(stream:Stream[Deal]):
    async for deal in stream:
        procesed_deals += 1

@app.timer(60)
async def print_procesed():
    print(procesed_deals)
    procesed_deals = 0

app.run()

A timer is a courutine that is triggered every n seconds in the previous examples we used 60 seconds.


Using executions loops

from PyWars import *
import asyncio

app = Client(loop=asyncio.get_event_loop())

@app.agent(Deal)
async def deals(stream:Stream[Deal]):
    async for deal in stream:
        print(deal)
try:
    app.start()
finally:
    app.stop()

The magical start and stop methods were thought to run and stop client with his execution loop smootly


Bulking

from PyWars import *

app = Client()

@app.agent(Deal)
async def deals(stream:Stream[Deal]):
    async for bulk in stream.take(100, 10):
        print(bulk)

app.run()

A bulk is just the use of Stream.take method from faust. It will try to take bulks of 100 objetcs, in case that it can´t return the 100 objects its going to wait for 10 seconds and return any amount gathered in that time.


For extended documentation see for the docs page.