Documentation Index
Fetch the complete documentation index at: https://docs.sorsa.io/llms.txt
Use this file to discover all available pages before exploring further.
How to Monitor Twitter Accounts and Keywords in Real Time via API
Real-time Twitter monitoring - detecting new tweets from specific accounts, tracking keyword mentions as they happen, and feeding live X data into your applications - is the foundation of social listening, breaking news alerts, competitive intelligence, and automated engagement tools. This guide shows you how to build a near-real-time monitoring system using the Sorsa API that detects new tweets within seconds of posting, with complete working examples in Python and JavaScript.
Sorsa API delivers fresh data on every request: if a tweet was posted half a second ago, your next API call will return it. Combined with a rate limit of 20 requests per second and response times under 400ms, you can build monitoring loops that rival dedicated streaming services - with the simplicity of standard REST calls and without managing persistent connections, reconnection logic, or complex authentication flows.
How Polling-Based Monitoring Works
There are two approaches to getting real-time data from a social platform: push-based (streaming/webhooks) and pull-based (polling). The official X API offers a Filtered Stream endpoint, but it requires expensive access tiers ($5,000+/month Pro plan), persistent HTTP connections with reconnection logic, and careful rule management.
Sorsa API uses a pull-based approach. The pattern has four steps:
- Poll an endpoint at a regular interval (every 1-30 seconds).
- Compare results against previously seen tweet IDs to identify new tweets.
- Process new tweets - send alerts, store in a database, post to Slack, etc.
- Repeat.
You control the polling interval, so you can balance latency against API usage. There are no persistent connections to manage, no reconnection strategies to implement, and if your script crashes, it picks up where it left off on the next poll. Deduplication is reliable because X tweet IDs (Snowflake IDs) are monotonically increasing - a higher ID always means a newer tweet.
Choosing the Right Endpoint
| What you want to monitor | Endpoint | Method | Why this one |
|---|
| A single account | /user-tweets | POST | Returns the latest tweets from one user’s timeline. |
| A group of accounts (up to 5,000) | /list-tweets | GET | Single request covers all members of an X List. |
| A keyword or hashtag | /search-tweets | POST | Full search operator support, chronological ordering. |
| @mentions of an account | /mentions | POST | Purpose-built for mention tracking with engagement filters. |
Level 1: Monitor a Single Account
The simplest case. You want to know the moment a specific account posts something new - a competitor, a CEO, a regulator, or an influencer.
Python
import requests
import time
API_KEY = "YOUR_API_KEY"
USERNAME = "elonmusk"
POLL_INTERVAL = 5 # seconds
URL = "https://api.sorsa.io/v3/user-tweets"
HEADERS = {"ApiKey": API_KEY, "Content-Type": "application/json"}
last_seen_id = None
print(f"Monitoring @{USERNAME}...")
while True:
try:
resp = requests.post(URL, headers=HEADERS, json={"link": f"https://x.com/{USERNAME}"})
resp.raise_for_status()
tweets = resp.json().get("tweets", [])
if tweets:
if last_seen_id is None:
last_seen_id = tweets[0]["id"]
print(f"Baseline set: {last_seen_id}")
else:
new_tweets = [t for t in tweets if t["id"] > last_seen_id]
for tweet in reversed(new_tweets):
print(f"[NEW] @{USERNAME}: {tweet['full_text'][:140]}")
if new_tweets:
last_seen_id = new_tweets[0]["id"]
except Exception as e:
print(f"Error: {e}")
time.sleep(POLL_INTERVAL * 2)
continue
time.sleep(POLL_INTERVAL)
JavaScript
const API_KEY = "YOUR_API_KEY";
const USERNAME = "elonmusk";
const POLL_INTERVAL = 5000;
let lastSeenId = null;
console.log(`Monitoring @${USERNAME}...`);
while (true) {
try {
const resp = await fetch("https://api.sorsa.io/v3/user-tweets", {
method: "POST",
headers: { "ApiKey": API_KEY, "Content-Type": "application/json" },
body: JSON.stringify({ link: `https://x.com/${USERNAME}` }),
});
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const tweets = (await resp.json()).tweets || [];
if (tweets.length > 0) {
if (lastSeenId === null) {
lastSeenId = tweets[0].id;
console.log(`Baseline set: ${lastSeenId}`);
} else {
const newTweets = tweets.filter((t) => t.id > lastSeenId);
for (const t of [...newTweets].reverse()) {
console.log(`[NEW] @${USERNAME}: ${t.full_text.slice(0, 140)}`);
}
if (newTweets.length) lastSeenId = newTweets[0].id;
}
}
} catch (err) {
console.error(`Error: ${err.message}`);
await new Promise((r) => setTimeout(r, POLL_INTERVAL * 2));
continue;
}
await new Promise((r) => setTimeout(r, POLL_INTERVAL));
}
This works, but it scales poorly. Monitoring 50 accounts means 50 separate polling loops and 50x the API requests. That is where X Lists come in.
Level 2: Monitor Many Accounts with a Single Request
X Lists let you group up to 5,000 accounts and fetch their combined activity in one API call via /list-tweets. This is the most efficient approach for multi-account monitoring and the pattern you should default to in production. For more on working with Lists, see Lists & Communities.
Step 1: Create a Public X List
- Go to X Lists and create a new list.
- Add the accounts you want to monitor.
- Make sure the list is Public - private lists cannot be accessed via the API.
- Copy the List ID from the URL (e.g., in
https://x.com/i/lists/1234567890, the ID is 1234567890).
Step 2: Poll the List
import requests
import time
API_KEY = "YOUR_API_KEY"
LIST_ID = "YOUR_LIST_ID"
POLL_INTERVAL = 5
URL = f"https://api.sorsa.io/v3/list-tweets?list_id={LIST_ID}"
HEADERS = {"ApiKey": API_KEY, "Accept": "application/json"}
def monitor_list(callback):
"""Poll an X List and call `callback` for each new tweet detected."""
last_seen_id = None
print(f"Monitoring List {LIST_ID} (interval: {POLL_INTERVAL}s)")
while True:
try:
resp = requests.get(URL, headers=HEADERS)
resp.raise_for_status()
tweets = resp.json().get("tweets", [])
if not tweets:
time.sleep(POLL_INTERVAL)
continue
if last_seen_id is None:
last_seen_id = tweets[0]["id"]
print(f"Baseline set: {last_seen_id}")
else:
new_tweets = [t for t in tweets if t["id"] > last_seen_id]
if new_tweets:
for tweet in reversed(new_tweets):
callback(tweet)
last_seen_id = new_tweets[0]["id"]
except requests.exceptions.RequestException as e:
print(f"Request error: {e}. Retrying in {POLL_INTERVAL * 2}s...")
time.sleep(POLL_INTERVAL * 2)
continue
time.sleep(POLL_INTERVAL)
def on_new_tweet(tweet):
user = tweet["user"]
print(f"[NEW] @{user['username']}: {tweet['full_text'][:120]}")
print(f" Likes: {tweet.get('likes_count', 0)} | "
f"RTs: {tweet.get('retweet_count', 0)} | "
f"Views: {tweet.get('view_count', 'N/A')}\n")
if __name__ == "__main__":
monitor_list(on_new_tweet)
The efficiency gain is massive. Monitoring 50 accounts individually at 10-second intervals costs 50 x 8,640 = 432,000 requests/day. Putting those same 50 accounts into one X List and polling /list-tweets costs 8,640 requests/day - a 50x reduction. For more cost optimization patterns like this, see Optimizing API Usage.
Level 3: Monitor a Keyword or Hashtag
Instead of tracking specific accounts, you can monitor a keyword, hashtag, or complex search query in real time. This uses /search-tweets with order: "latest" for chronological results.
import requests
import time
API_KEY = "YOUR_API_KEY"
QUERY = '"your brand" OR @yourbrand lang:en'
POLL_INTERVAL = 10
URL = "https://api.sorsa.io/v3/search-tweets"
HEADERS = {"ApiKey": API_KEY, "Content-Type": "application/json"}
def monitor_keyword(query, callback, interval=10):
last_seen_id = None
print(f"Monitoring: {query} (interval: {interval}s)")
while True:
try:
resp = requests.post(URL, headers=HEADERS, json={"query": query, "order": "latest"})
resp.raise_for_status()
tweets = resp.json().get("tweets", [])
if tweets:
if last_seen_id is None:
last_seen_id = tweets[0]["id"]
print(f"Baseline set: {last_seen_id}")
else:
new_tweets = [t for t in tweets if t["id"] > last_seen_id]
for tweet in reversed(new_tweets):
callback(tweet)
if new_tweets:
last_seen_id = new_tweets[0]["id"]
except Exception as e:
print(f"Error: {e}")
time.sleep(interval * 2)
continue
time.sleep(interval)
monitor_keyword(QUERY, on_new_tweet, interval=10)
You can use any search operator in the query string. For example, to monitor only high-engagement mentions of your brand in English, excluding retweets:
monitor_keyword('"your brand" min_faves:10 lang:en -filter:retweets', on_new_tweet)
The monitoring loop detects new tweets; the callback decides what to do with them. Here is a callback that forwards each tweet to a Slack channel via an Incoming Webhook:
import requests
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
def send_to_slack(tweet):
user = tweet["user"]
text = (
f"*New tweet from @{user['username']}*\n"
f"{tweet['full_text']}\n"
f"Likes: {tweet.get('likes_count', 0)} | "
f"RTs: {tweet.get('retweet_count', 0)} | "
f"Views: {tweet.get('view_count', 'N/A')}\n"
f"https://x.com/{user['username']}/status/{tweet['id']}"
)
requests.post(SLACK_WEBHOOK_URL, json={"text": text})
# Plug it into any monitor:
monitor_list(send_to_slack)
# or: monitor_keyword("bitcoin lang:en min_faves:50", send_to_slack)
The same pattern works for Discord (different JSON format), Telegram (Bot API call), or any HTTP endpoint your system exposes. You are effectively building your own webhook relay - Sorsa provides the data, your script routes it.
API Usage Calculator
Polling uses one request per cycle. Before choosing your interval, consider the monthly cost:
| Interval | Req/Hour | Req/Day | Req/Month (30d) |
|---|
| 1 second | 3,600 | 86,400 | 2,592,000 |
| 5 seconds | 720 | 17,280 | 518,400 |
| 10 seconds | 360 | 8,640 | 259,200 |
| 30 seconds | 120 | 2,880 | 86,400 |
| 1 minute | 60 | 1,440 | 43,200 |
For most social listening and brand monitoring, polling every 10-30 seconds is sufficient - you catch any tweet within half a minute of posting. Reserve 1-5 second intervals for financial signal detection or breaking news bots. See Pricing to estimate your monthly cost based on these numbers.
For high-volume monitoring needs that exceed standard plan limits, contact us at contacts@sorsa.io or on Discord for custom enterprise quotas and volume discounts.
Production Hardening
The examples above work for development and testing. For production, address these three concerns:
1. Persist last_seen_id Across Restarts
If your script crashes and restarts without remembering its checkpoint, it either reprocesses old tweets (duplicate alerts) or misses the gap entirely. Store the last seen ID in a file, database, or Redis:
import json
import os
STATE_FILE = "monitor_state.json"
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
return json.load(f).get("last_seen_id")
return None
def save_state(last_seen_id):
with open(STATE_FILE, "w") as f:
json.dump({"last_seen_id": last_seen_id}, f)
2. Implement Exponential Backoff for Errors
Network issues, rate limits (HTTP 429), and temporary API errors will happen. Instead of retrying immediately, back off gradually. For the full error code reference, see Error Codes.
retry_delay = POLL_INTERVAL
while True:
try:
resp = requests.get(URL, headers=HEADERS)
if resp.status_code == 429:
print("Rate limited. Backing off...")
retry_delay = min(retry_delay * 2, 60)
time.sleep(retry_delay)
continue
resp.raise_for_status()
retry_delay = POLL_INTERVAL # Reset on success
# ... process tweets ...
except Exception as e:
print(f"Error: {e}")
retry_delay = min(retry_delay * 2, 60)
time.sleep(retry_delay)
continue
time.sleep(POLL_INTERVAL)
3. Separate Polling from Processing
Do not run expensive operations (NLP, database writes, external API calls) synchronously inside the polling loop. If a downstream system is slow, your polling loop falls behind schedule. Instead, push new tweets into a queue and process them in a separate worker:
from collections import deque
import threading
tweet_queue = deque()
def polling_loop():
"""Fast loop: poll and enqueue. No heavy processing here."""
# ... standard polling code ...
# Instead of calling callback(tweet), do:
tweet_queue.append(tweet)
def processing_worker():
"""Separate thread/process: dequeue and handle."""
while True:
if tweet_queue:
tweet = tweet_queue.popleft()
send_to_slack(tweet) # or any expensive operation
save_to_database(tweet)
else:
time.sleep(0.1)
threading.Thread(target=processing_worker, daemon=True).start()
polling_loop()
For heavier workloads, replace the in-memory deque with Redis, RabbitMQ, or any message broker your stack already uses.
4. Monitor Your Monitor
In production, log each poll cycle (timestamp, new tweet count, errors). Set up an alert if the monitor has not completed a successful poll in the last N minutes - this catches silent failures before they become data gaps. You can check the API’s operational status at any time on the Sorsa Status Page.
Next Steps
- Search Operators - use advanced filters to reduce noise in keyword-based monitoring.
- Track Mentions - dedicated endpoint for tracking @mentions with engagement filters.
- Rate Limits - handling 429 errors and optimizing request patterns.
- Pagination - backfill historical data alongside real-time monitoring.
- API Reference - full specification for
/list-tweets, /user-tweets, /search-tweets, and all Sorsa API endpoints.