root/trunk/plagger/lib/Plagger/Plugin/Aggregator/Xango.pm

Revision 683 (checked in by miyagawa, 14 years ago)
  • Plagger::Util::decode_content now takes $content or $res
  • Try xml encoding header first, before HTML meta tag to guess charsets
  • Subscription::XOXO to handle multibyte title okay
  • Unhandled feed is removed from Subscription
  • use Plagger::UserAgent? in Util.pm
  • Planer: Unuse Scrubber for now
  • Aggregator::Xango to handle auto-discovered feed mapping as well
  • Property svn:keywords set to Id
Line 
1 # $Id$
2 #
3 # Copyright (c) 2006 Daisuke Maki <dmaki@cpan.org>
4 # All rights reserved.
5
6 package Plagger::Plugin::Aggregator::Xango;
7 use strict;
8 use base qw( Plagger::Plugin::Aggregator::Simple );
9 use POE;
10 use Xango::Broker::Push;
11 # BEGIN { sub Xango::DEBUG { 1 } } # uncomment to get Xango debug messages
12
13 our $VERSION = '0.1';
14
15 sub register {
16     my($self, $context) = @_;
17
18     my %xango_args = (
19         Alias => 'xgbroker',
20         HandlerAlias => 'xghandler',
21         HttpCompArgs => [ Agent => "Plagger/$Plagger::VERSION (http://plagger.org/)", Timeout => $self->conf->{timeout} || 10 ],
22         %{$self->conf->{xango_args} || {}},
23     );
24     $self->{xango_alias} = $xango_args{Alias};
25     Plagger::Plugin::Aggregator::Xango::Crawler->spawn(
26         Plugin => $self,
27         UseCache => exists $self->conf->{use_cache} ?
28             $self->conf->{use_cache} : 1,
29         BrokerAlias => $xango_args{Alias},
30         MaxRedirect => $self->conf->{max_redirect} || 3,
31     );
32     Xango::Broker::Push->spawn(%xango_args);
33     $context->register_hook(
34         $self,
35         'customfeed.handle'   => \&aggregate,
36         'aggregator.finalize' => \&finalize,
37     );
38 }
39
40 sub aggregate {
41     my($self, $context, $args) = @_;
42
43     my $url = $args->{feed}->url;
44     return unless $url =~ m!^https?://!i;
45
46     $self->{_url2feed}->{$url} = $args->{feed}; # map from url to feed object
47
48     $context->log(info => "Fetch $url");
49     POE::Kernel->post($self->{xango_alias}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => 0));
50 }
51
52 sub handle_feed {
53     my($self, $url, $xml_ref) = @_;
54     $self->SUPER::handle_feed($url, $xml_ref, $self->{_url2feed}->{$url});
55 }
56
57 sub finalize {
58     my($self, $context, $args) = @_;
59     POE::Kernel->run;
60 }
61
62 package Plagger::Plugin::Aggregator::Xango::Crawler;
63 use strict;
64 use Feed::Find;
65 use POE;
66 use Storable qw(freeze thaw);
67 use XML::Feed;
68
69 sub apply_policy { 1 }
70 sub spawn  {
71     my $class = shift;
72     my %args  = @_;
73
74     POE::Session->create(
75         heap => {
76             PLUGIN => $args{Plugin}, USE_CACHE => $args{UseCache},
77             BROKER_ALIAS => $args{BrokerAlias},
78             MaxRedirect => $args{MaxRedirect},
79         },
80         package_states => [
81             $class => [ qw(_start _stop apply_policy prep_request handle_response) ]
82         ]
83     );
84 }
85
86 sub _start { $_[KERNEL]->alias_set('xghandler') }
87 sub _stop  { }
88 sub prep_request {
89     return unless $_[HEAP]->{USE_CACHE};
90
91     my $job = $_[ARG0];
92     my $req = $_[ARG1];
93     my $plugin = $_[HEAP]->{PLUGIN};
94
95     my $ref = $plugin->cache->get($job->uri);
96     if ($ref) {
97         $req->if_modified_since($ref->{LastModified})
98             if $ref->{LastModified};
99         $req->header('If-None-Match', $ref->{ETag})
100             if $ref->{ETag};
101     }
102 }
103
104 sub handle_response {
105     my $job = $_[ARG0];
106     my $plugin = $_[HEAP]->{PLUGIN};
107
108     my $redirect = $job->notes('redirect') + 1;
109     return if $redirect > $_[HEAP]->{MaxRedirect};
110
111     my $r = $job->notes('http_response');
112     my $url    = $job->uri;
113     if ($r->code =~ /^30[12]$/) {
114         $url = $r->header('location');
115         return unless $url =~ m!^https?://!i;
116         $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => $redirect));
117         return;
118     } else {
119         return unless $r->is_success;
120
121         my $ct = $r->content_type;
122         if ( $Feed::Find::IsFeed{$ct} ) {
123             $plugin->handle_feed($url, $r->content_ref);
124         } else {
125             my @feeds = Feed::Find->find_in_html($r->content_ref, $url);
126             if (@feeds) {
127                 my $feed_url = $feeds[0];
128                 return unless $feed_url =~ m!^https?://!i;
129
130                 # OMG we should alias Feed so it can be looked up with $feed_url, too
131                 $plugin->{_url2feed}->{$feed_url} = $plugin->{_url2feed}->{$url};
132
133                 $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($feed_url), redirect => $redirect));
134             }
135             return;
136         }
137     }
138
139     if ($_[HEAP]->{USE_CACHE}) {
140         $plugin->cache->set(
141             $job->uri,
142             {ETag => $r->header('ETag'),
143                 LastModified => $r->header('Last-Modified')}
144         );
145     }
146 }
147
148 1;
149
Note: See TracBrowser for help on using the browser.