Home

Awesome

Introduction

This connector uses the twitter streaming api to listen for status update messages and convert them to a Kafka Connect struct on the fly. The goal is to match as much of the Twitter Status object as possible.

Configuration

TwitterSourceConnector

This Twitter Source connector is used to pull data from Twitter in realtime.

name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector

# Set these required values
twitter.oauth.accessTokenSecret=
process.deletes=
filter.keywords=
kafka.status.topic=
kafka.delete.topic=
twitter.oauth.consumerSecret=
twitter.oauth.accessToken=
twitter.oauth.consumerKey=
NameDescriptionTypeDefaultValid ValuesImportance
filter.keywordsTwitter keywords to filter for.listhigh
filter.userIdsTwitter user IDs to follow.list""low
kafka.delete.topicKafka topic to write delete events to.stringhigh
kafka.status.topicKafka topic to write the statuses to.stringhigh
process.deletesShould this connector process deletes.booleanhigh
twitter.oauth.accessTokenOAuth access tokenpasswordhigh
twitter.oauth.accessTokenSecretOAuth access token secretpasswordhigh
twitter.oauth.consumerKeyOAuth consumer keypasswordhigh
twitter.oauth.consumerSecretOAuth consumer secretpasswordhigh
twitter.debugFlag to enable debug logging for the twitter api.booleanfalselow

Schemas

com.github.jcustenborder.kafka.connect.twitter.Place

Returns the place attached to this status

NameOptionalSchemaDefault ValueDocumentation
NametrueString
StreetAddresstrueString
CountryCodetrueString
IdtrueString
CountrytrueString
PlaceTypetrueString
URLtrueString
FullNametrueString

com.github.jcustenborder.kafka.connect.twitter.GeoLocation

Returns The location that this tweet refers to if available.

NameOptionalSchemaDefault ValueDocumentation
LatitudefalseFloat64returns the latitude of the geo location
LongitudefalseFloat64returns the longitude of the geo location

com.github.jcustenborder.kafka.connect.twitter.StatusDeletionNotice

Message that is received when a status is deleted from Twitter.

NameOptionalSchemaDefault ValueDocumentation
StatusIdfalseInt64
UserIdfalseInt64

com.github.jcustenborder.kafka.connect.twitter.StatusDeletionNoticeKey

Key for a message that is received when a status is deleted from Twitter.

NameOptionalSchemaDefault ValueDocumentation
StatusIdfalseInt64

com.github.jcustenborder.kafka.connect.twitter.StatusKey

Key for a twitter status.

NameOptionalSchemaDefault ValueDocumentation
IdtrueInt64

com.github.jcustenborder.kafka.connect.twitter.Status

Twitter status message.

NameOptionalSchemaDefault ValueDocumentation
CreatedAttrueTimestampReturn the created_at
IdtrueInt64Returns the id of the status
TexttrueStringReturns the text of the status
SourcetrueStringReturns the source
TruncatedtrueBooleanTest if the status is truncated
InReplyToStatusIdtrueInt64Returns the in_reply_tostatus_id
InReplyToUserIdtrueInt64Returns the in_reply_user_id
InReplyToScreenNametrueStringReturns the in_reply_to_screen_name
GeoLocationtruecom.github.jcustenborder.kafka.connect.twitter.GeoLocationReturns The location that this tweet refers to if available.
Placetruecom.github.jcustenborder.kafka.connect.twitter.PlaceReturns the place attached to this status
FavoritedtrueBooleanTest if the status is favorited
RetweetedtrueBooleanTest if the status is retweeted
FavoriteCounttrueInt32Indicates approximately how many times this Tweet has been "favorited" by Twitter users.
Userfalsecom.github.jcustenborder.kafka.connect.twitter.UserReturn the user associated with the status.
This can be null if the instance is from User.getStatus().
RetweettrueBoolean
ContributorsfalseArray of Int64Returns an array of contributors, or null if no contributor is associated with this status.
RetweetCounttrueInt32Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled.
RetweetedByMetrueBoolean
CurrentUserRetweetIdtrueInt64Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled.
PossiblySensitivetrueBoolean
LangtrueStringReturns the lang of the status text if available.
WithheldInCountriesfalseArray of StringReturns the list of country codes where the tweet is withheld
HashtagEntitiestrueArray of com.github.jcustenborder.kafka.connect.twitter.HashtagEntityReturns an array if hashtag mentioned in the tweet.
UserMentionEntitiestrueArray of com.github.jcustenborder.kafka.connect.twitter.UserMentionEntityReturns an array of user mentions in the tweet.
MediaEntitiestrueArray of com.github.jcustenborder.kafka.connect.twitter.MediaEntityReturns an array of MediaEntities if medias are available in the tweet.
SymbolEntitiestrueArray of com.github.jcustenborder.kafka.connect.twitter.SymbolEntityReturns an array of SymbolEntities if medias are available in the tweet.
URLEntitiestrueArray of com.github.jcustenborder.kafka.connect.twitter.URLEntityReturns an array if URLEntity mentioned in the tweet.

com.github.jcustenborder.kafka.connect.twitter.User

Return the user associated with the status. This can be null if the instance is from User.getStatus().

NameOptionalSchemaDefault ValueDocumentation
IdtrueInt64Returns the id of the user
NametrueStringReturns the name of the user
ScreenNametrueStringReturns the screen name of the user
LocationtrueStringReturns the location of the user
DescriptiontrueStringReturns the description of the user
ContributorsEnabledtrueBooleanTests if the user is enabling contributors
ProfileImageURLtrueStringReturns the profile image url of the user
BiggerProfileImageURLtrueString
MiniProfileImageURLtrueString
OriginalProfileImageURLtrueString
ProfileImageURLHttpstrueString
BiggerProfileImageURLHttpstrueString
MiniProfileImageURLHttpstrueString
OriginalProfileImageURLHttpstrueString
DefaultProfileImagetrueBooleanTests if the user has not uploaded their own avatar
URLtrueStringReturns the url of the user
ProtectedtrueBooleanTest if the user status is protected
FollowersCounttrueInt32Returns the number of followers
ProfileBackgroundColortrueString
ProfileTextColortrueString
ProfileLinkColortrueString
ProfileSidebarFillColortrueString
ProfileSidebarBorderColortrueString
ProfileUseBackgroundImagetrueBoolean
DefaultProfiletrueBooleanTests if the user has not altered the theme or background
ShowAllInlineMediatrueBoolean
FriendsCounttrueInt32Returns the number of users the user follows (AKA "followings")
CreatedAttrueTimestamp
FavouritesCounttrueInt32
UtcOffsettrueInt32
TimeZonetrueString
ProfileBackgroundImageURLtrueString
ProfileBackgroundImageUrlHttpstrueString
ProfileBannerURLtrueString
ProfileBannerRetinaURLtrueString
ProfileBannerIPadURLtrueString
ProfileBannerIPadRetinaURLtrueString
ProfileBannerMobileURLtrueString
ProfileBannerMobileRetinaURLtrueString
ProfileBackgroundTiledtrueBoolean
LangtrueStringReturns the preferred language of the user
StatusesCounttrueInt32
GeoEnabledtrueBoolean
VerifiedtrueBoolean
TranslatortrueBoolean
ListedCounttrueInt32Returns the number of public lists the user is listed on, or -1 if the count is unavailable.
FollowRequestSenttrueBooleanReturns true if the authenticating user has requested to follow this user, otherwise false.
WithheldInCountriesfalseArray of StringReturns the list of country codes where the user is withheld

com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant

NameOptionalSchemaDefault ValueDocumentation
UrltrueString
BitratetrueInt32
ContentTypetrueString

com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size

NameOptionalSchemaDefault ValueDocumentation
ResizetrueInt32
WidthtrueInt32
HeighttrueInt32

com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity

NameOptionalSchemaDefault ValueDocumentation
VideoAspectRatioWidthtrueInt32
VideoAspectRatioHeighttrueInt32
VideoDurationMillistrueInt64
VideoVariantstrueArray of com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant
ExtAltTexttrueString
IdtrueInt64Returns the id of the media.
TypetrueStringReturns the media type photo, video, animated_gif.
MediaURLtrueStringReturns the media URL.
SizesfalseMap of <Int32, com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size>Returns size variations of the media.
MediaURLHttpstrueStringReturns the media secure URL.
URLtrueStringReturns the URL mentioned in the tweet.
TexttrueStringReturns the URL mentioned in the tweet.
ExpandedURLtrueStringReturns the expanded URL if mentioned URL is shorten.
StarttrueInt32Returns the index of the start character of the URL mentioned in the tweet.
EndtrueInt32Returns the index of the end character of the URL mentioned in the tweet.
DisplayURLtrueStringReturns the display URL if mentioned URL is shorten.

com.github.jcustenborder.kafka.connect.twitter.HashtagEntity

NameOptionalSchemaDefault ValueDocumentation
TexttrueStringReturns the text of the hashtag without #.
StarttrueInt32Returns the index of the start character of the hashtag.
EndtrueInt32Returns the index of the end character of the hashtag.

com.github.jcustenborder.kafka.connect.twitter.MediaEntity

NameOptionalSchemaDefault ValueDocumentation
IdtrueInt64Returns the id of the media.
TypetrueStringReturns the media type photo, video, animated_gif.
MediaURLtrueStringReturns the media URL.
SizesfalseMap of <Int32, com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size>
MediaURLHttpstrueStringReturns the media secure URL.
VideoAspectRatioWidthtrueInt32
VideoAspectRatioHeighttrueInt32
VideoDurationMillistrueInt64
VideoVariantstrueArray of com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.VariantReturns size variations of the media.
ExtAltTexttrueString
URLtrueStringReturns the URL mentioned in the tweet.
TexttrueStringReturns the URL mentioned in the tweet.
ExpandedURLtrueStringReturns the expanded URL if mentioned URL is shorten.
StarttrueInt32Returns the index of the start character of the URL mentioned in the tweet.
EndtrueInt32Returns the index of the end character of the URL mentioned in the tweet.
DisplayURLtrueStringReturns the display URL if mentioned URL is shorten.

com.github.jcustenborder.kafka.connect.twitter.SymbolEntity

NameOptionalSchemaDefault ValueDocumentation
StarttrueInt32Returns the index of the start character of the symbol.
EndtrueInt32Returns the index of the end character of the symbol.
TexttrueStringReturns the text of the entity

com.github.jcustenborder.kafka.connect.twitter.URLEntity

NameOptionalSchemaDefault ValueDocumentation
URLtrueStringReturns the URL mentioned in the tweet.
TexttrueStringReturns the URL mentioned in the tweet.
ExpandedURLtrueStringReturns the expanded URL if mentioned URL is shorten.
StarttrueInt32Returns the index of the start character of the URL mentioned in the tweet.
EndtrueInt32Returns the index of the end character of the URL mentioned in the tweet.
DisplayURLtrueStringReturns the display URL if mentioned URL is shorten.

com.github.jcustenborder.kafka.connect.twitter.UserMentionEntity

NameOptionalSchemaDefault ValueDocumentation
NametrueStringReturns the name mentioned in the status.
IdtrueInt64Returns the user id mentioned in the status.
TexttrueStringReturns the screen name mentioned in the status.
ScreenNametrueStringReturns the screen name mentioned in the status.
StarttrueInt32Returns the index of the start character of the user mention.
EndtrueInt32Returns the index of the end character of the user mention.

Running in development

mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$CONFLUENT_HOME/bin/connect-standalone connect/connect-avro-docker.properties config/TwitterSourceConnector.properties