root/trunk/plagger/lib/Plagger/Plugin/Aggregator/Xango.pm

Revision 1937 (checked in by daisuke, 13 years ago)

Use Plagger::FeedParser->discover, and act more like Aggregator::Simple

  • Property svn:keywords set to Id
Line 
1 # $Id$
2 #
3 # Copyright (c) 2006 Daisuke Maki <dmaki@cpan.org>
4 # All rights reserved.
5
6 package Plagger::Plugin::Aggregator::Xango;
7 use strict;
8 use base qw( Plagger::Plugin::Aggregator::Simple );
9 use Plagger::FeedParser;
10 use URI::Fetch;
11 use HTTP::Status;
12 use POE;
13 use Xango::Broker::Push;
14 # BEGIN { sub Xango::DEBUG { 1 } } # uncomment to get Xango debug messages
15
16 our $VERSION = '0.1';
17
18 sub register {
19     my($self, $context) = @_;
20
21     my %xango_args = (
22         Alias => 'xgbroker',
23         HandlerAlias => 'xghandler',
24         HttpCompArgs => [
25             Agent => $self->conf->{agent} || "Plagger/$Plagger::VERSION (http://plagger.org/)",
26             Timeout => $self->conf->{timeout} || 10
27         ],
28         %{$self->conf->{xango_args} || {}},
29     );
30     $self->{xango_alias} = $xango_args{Alias};
31     Plagger::Plugin::Aggregator::Xango::Crawler->spawn(
32         Plugin => $self,
33         UseCache => exists $self->conf->{use_cache} ?
34             $self->conf->{use_cache} : 1,
35         BrokerAlias => $xango_args{Alias},
36         MaxRedirect => $self->conf->{max_redirect} || 3,
37     );
38     Xango::Broker::Push->spawn(%xango_args);
39     $context->register_hook(
40         $self,
41         'customfeed.handle'   => \&aggregate,
42         'aggregator.finalize' => \&finalize,
43     );
44 }
45
46 sub aggregate {
47     my($self, $context, $args) = @_;
48
49     my $url = $args->{feed}->url;
50     return unless $url =~ m!^https?://!i;
51
52     $self->{_url2feed}->{$url} = $args->{feed}; # map from url to feed object
53
54     $context->log(info => "Fetch $url");
55
56     my $job = Xango::Job->new(
57         uri => URI->new($url),
58         redirect => 0,
59         is_original_request => 1
60     );
61     POE::Kernel->post($self->{xango_alias}, 'enqueue_job', $job);
62 }
63
64 sub handle_feed {
65     my($self, $url, $xml_ref) = @_;
66     $self->SUPER::handle_feed($url, $xml_ref, $self->{_url2feed}->{$url});
67 }
68
69 sub finalize {
70     my($self, $context, $args) = @_;
71     POE::Kernel->run;
72 }
73
74 package Plagger::Plugin::Aggregator::Xango::Crawler;
75 use strict;
76 use Feed::Find;
77 use POE;
78 use Storable qw(freeze thaw);
79 use XML::Feed;
80
81 sub apply_policy { 1 }
82 sub spawn  {
83     my $class = shift;
84     my %args  = @_;
85
86     POE::Session->create(
87         heap => {
88             PLUGIN => $args{Plugin}, USE_CACHE => $args{UseCache},
89             BROKER_ALIAS => $args{BrokerAlias},
90             MaxRedirect => $args{MaxRedirect},
91         },
92         package_states => [
93             $class => [ qw(_start _stop apply_policy prep_request handle_response) ]
94         ]
95     );
96 }
97
98 sub _start { $_[KERNEL]->alias_set('xghandler') }
99 sub _stop  { }
100 sub prep_request {
101     return unless $_[HEAP]->{USE_CACHE};
102
103     my $job = $_[ARG0];
104     my $req = $_[ARG1];
105     my $plugin = $_[HEAP]->{PLUGIN};
106
107     my $ref = $plugin->cache->get($job->uri);
108     if ($ref) {
109         $req->if_modified_since($ref->{LastModified})
110             if $ref->{LastModified};
111         $req->header('If-None-Match', $ref->{ETag})
112             if $ref->{ETag};
113     }
114 }
115
116 sub handle_response {
117     my $job = $_[ARG0];
118     my $plugin = $_[HEAP]->{PLUGIN};
119
120     my $redirect = $job->notes('redirect') + 1;
121     return if $redirect > $_[HEAP]->{MaxRedirect};
122
123     my $r = $job->notes('http_response');
124     my $url    = $job->uri;
125     if ($r->code =~ /^30[12]$/) {
126         $url = $r->header('location');
127         return unless $url =~ m!^https?://!i;
128         $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($url), redirect => $redirect));
129         return;
130     }
131
132     if (! $r->is_success) {
133         Plagger->context->log(error => "Fetch for $url failed: " . $r->code);
134         return;
135     }
136
137     # P::P::A::Simple does this bit as the first thing when aggregate()
138     # gets called. But since we're going through Xango, we need to figure
139     # out if this is the "original" feed or not
140
141     if (! $job->notes('is_original_request')) {
142         $plugin->handle_feed($url, $r->content_ref);
143     } else {
144         # If this is the original request, chack if the content we've
145         # just fetched is a parsable feed. if not, refetch what's claimed
146         # to be the feed.
147
148         # XXX - Hack. P::F->discover likes to have URI::Fetch::Response
149         my $ufr = TO_URI_FETCH_RESPONSE( $r );
150         my $feed_url = Plagger::FeedParser->discover($ufr);
151         if ($feed_url eq $url) {
152             $plugin->handle_feed($url, $r->content_ref);
153         } elsif($feed_url) {
154             # OMG we should alias Feed so it can be looked up with $feed_url, too
155             $plugin->{_url2feed}->{$feed_url} = $plugin->{_url2feed}->{$url};
156
157             $_[KERNEL]->post($_[HEAP]->{BROKER_ALIAS}, 'enqueue_job', Xango::Job->new(uri => URI->new($feed_url), redirect => $redirect));
158         }
159     }
160
161     if ($_[HEAP]->{USE_CACHE}) {
162         $plugin->cache->set(
163             $job->uri,
164             {ETag => $r->header('ETag'),
165                 LastModified => $r->header('Last-Modified')}
166         );
167     }
168 }
169
170 sub TO_URI_FETCH_RESPONSE
171 {
172     my ($r) = @_;
173
174     my $ufr = URI::Fetch::Response->new();
175     $ufr->http_status($r->code);
176     $ufr->http_response($r);
177     $ufr->status(
178         $r->previous && $r->previous->code == &HTTP::Status::RC_MOVED_PERMANENTLY ? &URI::Fetch::URI_MOVED_PERMANENTLY :
179         $r->code == &HTTP::Status::RC_GONE ? &URI::Fetch::URI_GONE :
180         $r->code == &HTTP::Status::RC_NOT_MODIFIED ? &URI::Fetch::URI_NOT_MODIFIED :
181         &URI::Fetch::URI_OK
182     );
183     $ufr->etag($r->header('ETag'));
184     $ufr->last_modified($r->header('Last-Modified'));
185     $ufr->uri($r->request->uri);
186     $ufr->content($r->content);
187     $ufr->content_type($r->content_type);
188
189     return $ufr;
190 }
191
192 1;
193
Note: See TracBrowser for help on using the browser.